Files
2026-02-08 11:20:43 -10:00

355 lines
16 KiB
Clojure
Vendored

(ns tech.v3.libs.arrow-test
(:require [tech.v3.libs.arrow :as arrow]
[tech.v3.dataset :as ds]
[tech.v3.dataset.column :as ds-col]
[tech.v3.dataset.impl.sparse-column :as sparse-col]
[tech.v3.datatype.functional :as dfn]
[tech.v3.datatype :as dtype]
[tech.v3.libs.parquet]
[tech.v3.datatype.datetime :as dtype-dt]
[tech.v3.resource :as resource]
[clojure.test :refer [deftest is]])
(:import [java.time LocalTime]
[tech.v3.dataset Text]
[java.util Map]
[java.io ByteArrayOutputStream ByteArrayInputStream]))
(tech.v3.dataset.utils/set-slf4j-log-level :info)
(defn supported-datatype-ds
([n]
(-> (ds/->dataset {:boolean [true false true true false false true false false true]
:bytes (byte-array (range n))
:ubytes (dtype/make-container :uint8 (dfn/rem (range n) 256))
:shorts (short-array (range n))
:ushorts (dtype/make-container :uint16 (range n))
:ints (int-array (range n))
:uints (dtype/make-container :uint32 (range n))
:longs (long-array (range n))
:floats (float-array (range n))
:doubles (double-array (range n))
:strings (map str (range n))
:text (map (comp #(Text. %) str) (range n))
:instants (repeatedly n dtype-dt/instant)
:bigdec (repeatedly n #(BigDecimal/valueOf (+ 100 (rand-int 1700)) 2))
;; :bigint (let [rng (java.util.Random.)]
;; (repeatedly n #(BigInteger. 256 rng )))
;;external formats often don't support dash-case
:local_dates (repeatedly n dtype-dt/local-date)
:local_times (repeatedly n dtype-dt/local-time)
:uuids (repeatedly n #(java.util.UUID/randomUUID))})
(vary-meta assoc :name :testtable)))
([]
(supported-datatype-ds 10)))
(comment
(arrow/dataset->stream! (supported-datatype-ds 1000) "test/data/alldtypes.arrow-ipc-lz4"
{:compression :lz4})
(arrow/dataset->stream! (supported-datatype-ds 1000) "test/data/alldtypes.arrow-ipc-zstd"
{:compression :zstd})
(let [sds (supported-datatype-ds 1000)]
(arrow/dataset-seq->stream! "test/data/alldtypes.arrow-file-zstd"
{:compression :zstd
:format :file
:strings-as-text? true}
[(ds/select-rows sds (range 500))
;;test when you have to add more string dictionary values
(ds/select-rows sds (range 500 1000))]))
(def ignored (arrow/stream->dataset-seq "test/data/alldtypes.arrow-file-zstd"))
(def ignored (arrow/stream->dataset "test/data/alldtypes.arrow-ipc-zstd"))
)
(deftest base-datatype-test
(try
(resource/stack-resource-context
(let [ds (supported-datatype-ds)
_ (arrow/dataset->stream! ds "alldtypes.arrow")
mmap-ds (arrow/stream->dataset "alldtypes.arrow" {:open-type :mmap
:key-fn keyword})
copy-ds (arrow/stream->dataset "alldtypes.arrow" {:key-fn keyword})]
(doseq [col (vals ds)]
(let [cname ((meta col) :name)
dt (dtype/elemwise-datatype col)
inp-col (mmap-ds cname)
cp-col (copy-ds cname)]
(is (= dt (dtype/elemwise-datatype inp-col)) (str "inplace failure " cname))
(is (= dt (dtype/elemwise-datatype cp-col)) (str "copy failure " cname))
(is (= (vec col) (vec inp-col)) (str "inplace failure " cname))
(is (= (vec col) (vec cp-col)) (str "copy failure " cname))))))
(finally
(.delete (java.io.File. "alldtypes.arrow")))))
(deftest base-sparse-datatype-test
(try
(resource/stack-resource-context
(let [ds (sparse-col/->sparse-ds (supported-datatype-ds) 0.0)
_ (arrow/dataset->stream! ds "alldtypes-sparse.arrow")
mmap-ds (arrow/stream->dataset "alldtypes-sparse.arrow" {:open-type :mmap
:key-fn keyword})
copy-ds (arrow/stream->dataset "alldtypes-sparse.arrow" {:key-fn keyword})]
(is (every? sparse-col/is-sparse? (.values ^Map ds)))
(is (every? sparse-col/is-sparse? (.values ^Map mmap-ds)))
(is (every? sparse-col/is-sparse? (.values ^Map copy-ds)))
(doseq [col (vals ds)]
(let [cname ((meta col) :name)
dt (dtype/elemwise-datatype col)
inp-col (mmap-ds cname)
cp-col (copy-ds cname)]
(is (= dt (dtype/elemwise-datatype inp-col)) (str "inplace failure " cname))
(is (= dt (dtype/elemwise-datatype cp-col)) (str "copy failure " cname))
(is (= (vec col) (vec inp-col)) (str "inplace failure " cname))
(is (= (vec col) (vec cp-col)) (str "copy failure " cname))))))
(finally
(.delete (java.io.File. "alldtypes-sparse.arrow")))))
(deftest arrow-file-types
;;lz4 compression
(let [all-files ["test/data/alldtypes.arrow-feather" ;lz4
"test/data/alldtypes.arrow-feather-compressed" ;zstd
"test/data/alldtypes.arrow-feather-v1" ;v1
]]
(doseq [file all-files]
(is (= 1000 (ds/row-count (arrow/stream->dataset file)))))
;; lz4 with dependent frames))))))
(is (= 31962 (ds/row-count (arrow/stream->dataset "test/data/tweets_sentiment.feather"))))))
(deftest base-ds-seq-test
(try
(let [ds (supported-datatype-ds)
_ (arrow/dataset-seq->stream! "alldtypes-seq.arrow" {:strings-as-text? false} [ds ds ds])
mmap-ds-seq (arrow/stream->dataset-seq "alldtypes-seq.arrow" {:key-fn keyword
:open-type :mmap})
copy-ds-seq (arrow/stream->dataset-seq "alldtypes-seq.arrow" {:key-fn keyword})]
(is (= 3 (count mmap-ds-seq)))
(is (= 3 (count copy-ds-seq)))
(let [mmap-ds (last mmap-ds-seq)
copy-ds (last copy-ds-seq)]
(doseq [col (vals ds)]
(let [cname ((meta col) :name)
dt (dtype/elemwise-datatype col)
inp-col (mmap-ds cname)
cp-col (copy-ds cname)]
(is (= dt (dtype/elemwise-datatype inp-col)) (str "inplace failure " cname))
(is (= dt (dtype/elemwise-datatype cp-col)) (str "copy failure " cname))
(is (= (vec col) (vec inp-col)) (str "inplace failure " cname))
(is (= (vec col) (vec cp-col)) (str "copy failure " cname))))))
(finally
(.delete (java.io.File. "alldtypes-seq.arrow")))))
(deftest simple-stocks
(try
(let [stocks (ds/->dataset "test/data/stocks.csv")
_ (arrow/dataset->stream! stocks "temp.stocks.arrow")
stocks-copying (arrow/stream->dataset "temp.stocks.arrow")
stocks-inplace (arrow/stream->dataset "temp.stocks.arrow" {:open-type :mmap})
pystocks-copying (arrow/stream->dataset "test/data/stocks.pyarrow.stream")
pystocks-inplace (arrow/stream->dataset "test/data/stocks.pyarrow.stream")]
;;This is here just to make sure that the data isn't cleaned up until it
;;actually can safely be cleaned up. This was a bug that caused datatype to
;;bump from 5.11 to 5.12
(System/gc)
(is (dfn/equals (stocks "price") (stocks-copying "price")))
(is (dfn/equals (stocks "price") (stocks-inplace "price")))
(is (dfn/equals (stocks "price") (pystocks-copying "price")))
(is (dfn/equals (stocks "price") (pystocks-inplace "price")))
(is (= (vec (stocks "symbol")) (vec (stocks-copying "symbol"))))
(is (= (vec (stocks "symbol")) (vec (stocks-inplace "symbol"))))
;;python saves strings inline in the file - equivalent to :strings-as-text?
;;save option
(is (= (vec (stocks "symbol")) (mapv str (pystocks-copying "symbol"))))
(is (= (vec (stocks "symbol")) (mapv str (pystocks-inplace "symbol")))))
(finally
(.delete (java.io.File. "temp.stocks.arrow")))))
(deftest ames-house-prices
(try
(let [ames (ds/->dataset "test/data/ames-house-prices/train.csv")
_ (arrow/dataset->stream! ames "temp.ames.arrow")
ames-copying (arrow/stream->dataset "temp.ames.arrow")
ames-inplace (arrow/stream->dataset "temp.ames.arrow" {:open-type :mmap})
pyames-copying (arrow/stream->dataset "test/data/ames.pyarrow.stream")
pyames-inplace (arrow/stream->dataset "test/data/ames.pyarrow.stream")]
(System/gc)
(is (dfn/equals (ames "SalePrice") (ames-copying "SalePrice")))
(is (dfn/equals (ames "SalePrice") (ames-inplace "SalePrice")))
(is (= (ds-col/missing (ames "LotFrontage"))
(ds-col/missing (ames-copying "LotFrontage"))))
(is (= (ds-col/missing (ames "LotFrontage"))
(ds-col/missing (ames-inplace "LotFrontage"))))
(is (not= 0 (dtype/ecount (ds-col/missing (ames-inplace "LotFrontage")))))
(is (dfn/equals (ames "SalePrice") (pyames-copying "SalePrice")))
(is (dfn/equals (ames "SalePrice") (pyames-inplace "SalePrice")))
(is (= (ds-col/missing (ames "LotFrontage"))
(ds-col/missing (pyames-copying "LotFrontage"))))
(is (= (ds-col/missing (ames "LotFrontage"))
(ds-col/missing (pyames-inplace "LotFrontage")))))
(finally
(.delete (java.io.File. "temp.ames.arrow")))))
(deftest ames-compression-test
(try
(let [ames (ds/->dataset "test/data/ames-house-prices/train.csv")
_ (arrow/dataset->stream! ames "ames-uncompressed.arrow")
_ (arrow/dataset->stream! ames "ames-zstd.arrow" {:compression
{:compression-type :zstd
;;default is 3
:level 5}})
_ (arrow/dataset->stream! ames "ames-lz4.arrow" {:compression :lz4})
_ (arrow/dataset->stream! (sparse-col/->sparse-ds ames)
"ames-sparse-zstd.arrow" {:compression
{:compression-type :zstd
;;default is 3
:level 5}})
file-len (fn [path] (.length (java.io.File. (str path))))
_ (println (ds/->dataset {:save-type [:uncompressed :zstd :sparse-zstd :lz4]
:file-size [(file-len "ames-uncompressed.arrow")
(file-len "ames-zstd.arrow")
(file-len "ames-sparse-zstd.arrow")
(file-len "ames-lz4.arrow")]}))
uncomp (arrow/stream->dataset "ames-uncompressed.arrow")
zstd (arrow/stream->dataset "ames-zstd.arrow")
sparse-zstd (arrow/stream->dataset "ames-sparse-zstd.arrow")
lz4 (arrow/stream->dataset "ames-lz4.arrow")]
(System/gc)
(is (dfn/equals (uncomp "SalePrice") (zstd "SalePrice")))
(is (dfn/equals (uncomp "LotFrontage") (sparse-zstd "LotFrontage")))
(is (dfn/equals (uncomp "SalePrice") (lz4 "SalePrice"))))
(finally
(.delete (java.io.File. "ames-uncompressed.arrow"))
(.delete (java.io.File. "ames-zstd.arrow"))
(.delete (java.io.File. "ames-sparse-zstd.arrow"))
(.delete (java.io.File. "ames-lz4.arrow")))))
(deftest date-arrow-test
(let [date-data (arrow/read-stream-dataset-copying "test/data/with_date.arrow"
{:integer-datetime-types? true})]
(is (= [18070 18072 18063]
(date-data "date")))
(is (= :epoch-days (dtype/elemwise-datatype (date-data "date")))))
(let [date-data (arrow/read-stream-dataset-copying "test/data/with_date.arrow")]
(is (= (mapv #(java.time.LocalDate/parse %)
["2019-06-23" "2019-06-25" "2019-06-16"])
(date-data "date")))
(is (= :packed-local-date (dtype/elemwise-datatype (date-data "date"))))))
(deftest odd-parquet-crash
(let [test-data (ds/->dataset "test/data/part-00000-74d3eb51-bc9c-4ba5-9d13-9e0d71eea31f.c000.snappy.parquet")]
(try
(arrow/write-dataset-to-stream! test-data "test.arrow")
(let [arrow-ds (arrow/read-stream-dataset-copying "test.arrow")]
(is (= (ds/missing test-data)
(ds/missing arrow-ds))))
(finally
(.delete (java.io.File. "test.arrow"))))))
(deftest failed-R-file
(let [cp-data (arrow/read-stream-dataset-copying "test/data/part-8981.ipc_stream")
inp-data (arrow/read-stream-dataset-inplace "test/data/part-8981.ipc_stream")]
(is (= (vec (ds/column-names cp-data))
(vec (ds/column-names inp-data))))))
(deftest large-var-char-file
(let [cp-data (arrow/read-stream-dataset-copying "test/data/largeVarChar.ipc")
inp-data (arrow/read-stream-dataset-inplace "test/data/largeVarChar.ipc")]
(is (= (vec (ds/column-names cp-data))
(vec (ds/column-names inp-data))))
(is (= (vec (first (ds/columns cp-data)))
(vec (first (ds/columns inp-data)))))))
(deftest uuid-test
(let [py-uuid (ds/->dataset "test/data/uuid_ext.arrow" {:key-fn keyword})]
(is (= :uuid (dtype/elemwise-datatype (py-uuid :id))))
(is (= (mapv #(java.util.UUID/fromString %)
["8be643d6-0df7-4e5e-837c-f94170c87914"
"24bc9cf4-e2e8-444f-bb2d-82394f33ff76"
"e8149e1b-aef6-4671-b1b4-3b7a21eed92a"])
(py-uuid :id))))
(try
(let [uuid-ds (ds/->dataset "test/data/uuid.parquet"
{:parser-fn {"uuids" :uuid}})
_ (arrow/write-dataset-to-stream! uuid-ds "test-uuid.arrow")
copying-ds (arrow/read-stream-dataset-copying "test-uuid.arrow")
inplace-ds (arrow/read-stream-dataset-inplace "test-uuid.arrow")]
(is (= :uuid ((comp :datatype meta) (copying-ds "uuids"))))
(is (= :uuid ((comp :datatype meta) (inplace-ds "uuids"))))
(is (= (vec (copying-ds "uuids"))
(vec (inplace-ds "uuids"))))
(is (= (vec (uuid-ds "uuids"))
(vec (copying-ds "uuids")))))
(finally
(.delete (java.io.File. "test-uuid.arrow")))))
(deftest local-time
(try
(let [ds (ds/->dataset {"a" (range 10)
"b" (repeat 10 (java.time.LocalTime/now))})
_ (arrow/write-dataset-to-stream! ds "test-local-time.arrow")
copying-ds (arrow/read-stream-dataset-copying "test-local-time.arrow")
inplace-ds (arrow/read-stream-dataset-inplace "test-local-time.arrow")]
(is (= :packed-local-time (dtype/elemwise-datatype (copying-ds "b"))))
(is (= :packed-local-time (dtype/elemwise-datatype (inplace-ds "b"))))
(is (= (vec (copying-ds "b"))
(vec (inplace-ds "b"))))
;;Making a primitive container will use the packed data.
(is (= (vec (ds "b"))
(vec (copying-ds "b")))))
(finally
(.delete (java.io.File. "test-local-time.arrow")))))
(deftest string-arrow
(let [dataset (ds/->dataset [{"col1" "a"}] {:parser-fn :string})
baos (ByteArrayOutputStream.)]
(resource/stack-resource-context
(arrow/dataset->stream! dataset baos {:compression :lz4})
(let [written-bytes (.toByteArray baos)
arrow-ds-rtt (arrow/stream->dataset written-bytes)
_ (.reset baos)
_ (arrow/dataset->stream! arrow-ds-rtt baos {:compression :lz4})
b2 (.toByteArray baos)
final-ds (arrow/stream->dataset b2)]
(is (= (vec (dataset "col1"))
(vec (final-ds "col1"))))))))
(deftest nullcol
(let [ds (arrow/stream->dataset "test/data/withnullcol.arrow")]
(is (= (vec (range (ds/row-count ds)))
(vec (ds/missing (ds "nullcol")))))))
(deftest list-datatypes-read-only
(let [ds (ds/->dataset "test/data/arrow_list.arrow")]
(is (= [["dog" "car"]
["dog" "flower"]
["car" "flower"]]
(mapv vec (ds "class-name"))))))
(deftest empty-array-dataset
(is (nil? (arrow/stream->dataset "test/data/empty.arrow"))))