- Notifications
You must be signed in to change notification settings - Fork 32
Bugfix/closeable seq finalize #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
de31821 d19078a 21caa79 a32a03a ab546ee File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,8 @@ | ||
| {:lint-as {byte-streams.utils/defprotocol+ clojure.core/defprotocol | ||
| byte-streams.utils/deftype+ clojure.core/deftype | ||
| byte-streams.utils/defrecord+ clojure.core/defrecord | ||
| byte-streams.utils/definterface+ clojure.core/definterface}} | ||
| {:lint-as {byte-streams.utils/defprotocol+ clojure.core/defprotocol | ||
| byte-streams.utils/deftype+ clojure.core/deftype | ||
| byte-streams.utils/defrecord+ clojure.core/defrecord | ||
| byte-streams.utils/definterface+ clojure.core/definterface | ||
| clj-commons.byte-streams.utils/defprotocol+ clojure.core/defprotocol | ||
| clj-commons.byte-streams.utils/deftype+ clojure.core/deftype | ||
| clj-commons.byte-streams.utils/defrecord+ clojure.core/defrecord | ||
| clj-commons.byte-streams.utils/definterface+ clojure.core/definterface}} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -16,3 +16,4 @@ dump* | |
| DS_Store | ||
| .nrepl* | ||
| .clj-kondo/.cache | ||
| .eastwood | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -9,46 +9,40 @@ | |
| [protocols :as proto] | ||
| [pushback-stream :as ps] | ||
| [char-sequence :as cs]] | ||
| [clojure.java.io :as io] | ||
| [clj-commons.primitive-math :as p]) | ||
| (:import | ||
| [clj_commons.byte_streams | ||
| Utils | ||
| ByteBufferInputStream] | ||
| [clj_commons.byte_streams.graph | ||
| Type] | ||
| [java.nio | ||
| ByteBuffer | ||
| DirectByteBuffer] | ||
| [java.lang.reflect | ||
| Array] | ||
| [java.util.concurrent.atomic | ||
| AtomicBoolean] | ||
| [java.io | ||
| File | ||
| FileOutputStream | ||
| FileInputStream | ||
| ByteArrayInputStream | ||
| ByteArrayOutputStream | ||
| PipedOutputStream | ||
| PipedInputStream | ||
| DataInputStream | ||
| InputStream | ||
| OutputStream | ||
| IOException | ||
| RandomAccessFile | ||
| Reader | ||
| InputStreamReader | ||
| BufferedReader] | ||
| [java.nio.channels | ||
| ReadableByteChannel | ||
| WritableByteChannel | ||
| FileChannel | ||
| FileChannel$MapMode | ||
| Channels | ||
| Pipe] | ||
| [java.nio.channels.spi | ||
| AbstractSelectableChannel])) | ||
| (clj_commons.byte_streams | ||
| Utils | ||
| ByteBufferInputStream) | ||
| (clj_commons.byte_streams.graph | ||
| Type) | ||
| (java.nio | ||
| ByteBuffer) | ||
| (java.lang.reflect | ||
| Array) | ||
| (java.io | ||
| File | ||
| ByteArrayInputStream | ||
| ByteArrayOutputStream | ||
| PipedOutputStream | ||
| PipedInputStream | ||
| DataInputStream | ||
| InputStream | ||
| OutputStream | ||
| IOException | ||
| Reader | ||
| BufferedReader | ||
| InputStreamReader) | ||
| (java.nio.channels | ||
| ReadableByteChannel | ||
| WritableByteChannel | ||
| FileChannel | ||
| FileChannel$MapMode | ||
| Channels | ||
| Pipe) | ||
| (java.nio.channels.spi | ||
| AbstractSelectableChannel) | ||
| (java.nio.file StandardOpenOption))) | ||
| | ||
| ;;; | ||
| | ||
| | @@ -576,35 +570,57 @@ | |
| ;; file => readable-channel | ||
| (def-conversion ^{:cost 0} [File ReadableByteChannel] | ||
| [file] | ||
| (.getChannel (FileInputStream. file))) | ||
| (-> file | ||
| (.toPath) | ||
| (FileChannel/open (into-array StandardOpenOption | ||
| [StandardOpenOption/READ])))) | ||
| | ||
| ;; file => writable-channel | ||
| (def-conversion ^{:cost 0} [File WritableByteChannel] | ||
| [file {:keys [append?] :or {append? true}}] | ||
| (.getChannel (FileOutputStream. file (boolean append?)))) | ||
| | ||
| (def-conversion ^{:cost 0} [File (seq-of ByteBuffer)] | ||
| (let [option-array (into-array StandardOpenOption | ||
| (cond-> [StandardOpenOption/CREATE | ||
| StandardOpenOption/WRITE] | ||
| append? | ||
| (conj StandardOpenOption/APPEND)))] | ||
| (-> file | ||
| (.toPath) | ||
| (FileChannel/open option-array)))) | ||
| | ||
| ;; | ||
| (def-conversion ^{:cost 0 | ||
| :doc "Assumes the file size is static."} [File (seq-of ByteBuffer)] | ||
| [file {:keys [chunk-size writable?] :or {chunk-size (int 2e9), writable? false}}] | ||
| (let [^RandomAccessFile raf (RandomAccessFile. file (if writable? "rw" "r")) | ||
| ^FileChannel fc (.getChannel raf) | ||
| (let [option-array (into-array StandardOpenOption | ||
| (cond-> [StandardOpenOption/READ] | ||
| writable? | ||
| (conj StandardOpenOption/CREATE | ||
| StandardOpenOption/WRITE))) | ||
| ^FileChannel fc (-> file | ||
| (.toPath) | ||
| (FileChannel/open option-array)) | ||
| close-fn #(.close fc) | ||
| buf-seq (fn buf-seq [offset] | ||
| (when-not (<= (.size fc) offset) | ||
| (let [remaining (- (.size fc) offset)] | ||
| (when (and (.isOpen fc) | ||
| Collaborator There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess adding a Collaborator Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's required, in case the user manually closed the returned closeable-seq. If they close it, that will also .close the RAF and channel, but the seq is still usable, and can be read from up until the point of closing. Attempts to read further would result in Exceptions without checking, this way the seq just ends. | ||
| (> (.size fc) offset)) | ||
| (let [remaining (- (.size fc) offset) | ||
| close-after-reading? (<= remaining chunk-size)] | ||
| (lazy-seq | ||
| (cons | ||
| (.map fc | ||
| (if writable? | ||
| FileChannel$MapMode/READ_WRITE | ||
| FileChannel$MapMode/READ_ONLY) | ||
| offset | ||
| (min remaining chunk-size)) | ||
| (let [mbb (.map fc | ||
| (if writable? | ||
| FileChannel$MapMode/READ_WRITE | ||
| FileChannel$MapMode/READ_ONLY) | ||
| offset | ||
| (min remaining chunk-size))] | ||
| (when close-after-reading? | ||
| (close-fn)) | ||
| Comment on lines +616 to +617 Collaborator There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
To me, as we open the resource, we have the responsibility to close it. Collaborator Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, but that comment's not about responsibility, but default behavior. For a static file, we should close upon reading the last byte, since with the The catch is, what if you're reading a file that's growing, like tailing a log file? You effectively want an infinite seq. As it is now, if the lazy-seq ever catches up to the current end of the file, it'll stop and close, even if the file is still growing. The more I think about it, the more I think there should be an option to turn off auto-closing if you don't want it. Collaborator Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hrrm. Unfortunately, a lot more of the code in that conversion assumes the file is static. In particular, That test also means shrinking or truncating the file will make it fail to .close, though to be honest, a seq across a shrinking file doesn't make logical sense anyway. I think I'll just add a comment for now. This will require reworking to add blocking/async behavior if we wanted it to handle growing files. | ||
| mbb) | ||
| (buf-seq (+ offset chunk-size)))))))] | ||
| (g/closeable-seq | ||
| (buf-seq 0) | ||
| false | ||
| #(do | ||
| (.close raf) | ||
| (.close fc))))) | ||
| close-fn))) | ||
| | ||
| ;; output-stream => writable-channel | ||
| (def-conversion ^{:cost 0} [OutputStream WritableByteChannel] | ||
| | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -194,25 +194,21 @@ | |
| (close-fn) | ||
| nil) | ||
| (reify | ||
| | ||
| clojure.lang.IPending | ||
| (isRealized [_] | ||
| (or | ||
| (not (instance? clojure.lang.IPending s)) | ||
| (realized? s))) | ||
| | ||
| Object | ||
| (finalize [_] | ||
| (close-fn)) | ||
| Comment on lines -205 to -206 Collaborator There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'll have to move away from finalizers anyway, better today than tomorrow. :) | ||
| | ||
| java.io.Closeable | ||
| (close [_] | ||
| (close-fn)) | ||
| | ||
| clojure.lang.Sequential | ||
| clojure.lang.ISeq | ||
| clojure.lang.Seqable | ||
| (seq [this] this) | ||
| | ||
| clojure.lang.ISeq | ||
| Collaborator There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it expected to have moved this line? Collaborator Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kind of, it throws off kondo when a protocol/interface isn't followed by the methods it declares. Also, the | ||
| (cons [_ a] | ||
| (closeable-seq (cons a s) exhaustible? close-fn)) | ||
| (next [this] | ||
| | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The good news was the previous behavior was fine already [1].
However, the
.getChannelseemed to be present for legacy reason to be able to leveragejava.niofromjava.io. This is a welcome change![1] : https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/sun/nio/ch/FileChannelImpl.java#L201-L208
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, ok. The javadocs said they were synced in terms of things like reading position, but didn't actually say anything specifically about closing, so I didn't want to assume.