(ns tech.v3.libs.arrow-test (:require [tech.v3.libs.arrow :as arrow] [tech.v3.dataset :as ds] [tech.v3.dataset.column :as ds-col] [tech.v3.dataset.impl.sparse-column :as sparse-col] [tech.v3.datatype.functional :as dfn] [tech.v3.datatype :as dtype] [tech.v3.libs.parquet] [tech.v3.datatype.datetime :as dtype-dt] [tech.v3.resource :as resource] [clojure.test :refer [deftest is]]) (:import [java.time LocalTime] [tech.v3.dataset Text] [java.util Map] [java.io ByteArrayOutputStream ByteArrayInputStream])) (tech.v3.dataset.utils/set-slf4j-log-level :info) (defn supported-datatype-ds ([n] (-> (ds/->dataset {:boolean [true false true true false false true false false true] :bytes (byte-array (range n)) :ubytes (dtype/make-container :uint8 (dfn/rem (range n) 256)) :shorts (short-array (range n)) :ushorts (dtype/make-container :uint16 (range n)) :ints (int-array (range n)) :uints (dtype/make-container :uint32 (range n)) :longs (long-array (range n)) :floats (float-array (range n)) :doubles (double-array (range n)) :strings (map str (range n)) :text (map (comp #(Text. %) str) (range n)) :instants (repeatedly n dtype-dt/instant) :bigdec (repeatedly n #(BigDecimal/valueOf (+ 100 (rand-int 1700)) 2)) ;; :bigint (let [rng (java.util.Random.)] ;; (repeatedly n #(BigInteger. 256 rng ))) ;;external formats often don't support dash-case :local_dates (repeatedly n dtype-dt/local-date) :local_times (repeatedly n dtype-dt/local-time) :uuids (repeatedly n #(java.util.UUID/randomUUID))}) (vary-meta assoc :name :testtable))) ([] (supported-datatype-ds 10))) (comment (arrow/dataset->stream! (supported-datatype-ds 1000) "test/data/alldtypes.arrow-ipc-lz4" {:compression :lz4}) (arrow/dataset->stream! (supported-datatype-ds 1000) "test/data/alldtypes.arrow-ipc-zstd" {:compression :zstd}) (let [sds (supported-datatype-ds 1000)] (arrow/dataset-seq->stream! "test/data/alldtypes.arrow-file-zstd" {:compression :zstd :format :file :strings-as-text? true} [(ds/select-rows sds (range 500)) ;;test when you have to add more string dictionary values (ds/select-rows sds (range 500 1000))])) (def ignored (arrow/stream->dataset-seq "test/data/alldtypes.arrow-file-zstd")) (def ignored (arrow/stream->dataset "test/data/alldtypes.arrow-ipc-zstd")) ) (deftest base-datatype-test (try (resource/stack-resource-context (let [ds (supported-datatype-ds) _ (arrow/dataset->stream! ds "alldtypes.arrow") mmap-ds (arrow/stream->dataset "alldtypes.arrow" {:open-type :mmap :key-fn keyword}) copy-ds (arrow/stream->dataset "alldtypes.arrow" {:key-fn keyword})] (doseq [col (vals ds)] (let [cname ((meta col) :name) dt (dtype/elemwise-datatype col) inp-col (mmap-ds cname) cp-col (copy-ds cname)] (is (= dt (dtype/elemwise-datatype inp-col)) (str "inplace failure " cname)) (is (= dt (dtype/elemwise-datatype cp-col)) (str "copy failure " cname)) (is (= (vec col) (vec inp-col)) (str "inplace failure " cname)) (is (= (vec col) (vec cp-col)) (str "copy failure " cname)))))) (finally (.delete (java.io.File. "alldtypes.arrow"))))) (deftest base-sparse-datatype-test (try (resource/stack-resource-context (let [ds (sparse-col/->sparse-ds (supported-datatype-ds) 0.0) _ (arrow/dataset->stream! ds "alldtypes-sparse.arrow") mmap-ds (arrow/stream->dataset "alldtypes-sparse.arrow" {:open-type :mmap :key-fn keyword}) copy-ds (arrow/stream->dataset "alldtypes-sparse.arrow" {:key-fn keyword})] (is (every? sparse-col/is-sparse? (.values ^Map ds))) (is (every? sparse-col/is-sparse? (.values ^Map mmap-ds))) (is (every? sparse-col/is-sparse? (.values ^Map copy-ds))) (doseq [col (vals ds)] (let [cname ((meta col) :name) dt (dtype/elemwise-datatype col) inp-col (mmap-ds cname) cp-col (copy-ds cname)] (is (= dt (dtype/elemwise-datatype inp-col)) (str "inplace failure " cname)) (is (= dt (dtype/elemwise-datatype cp-col)) (str "copy failure " cname)) (is (= (vec col) (vec inp-col)) (str "inplace failure " cname)) (is (= (vec col) (vec cp-col)) (str "copy failure " cname)))))) (finally (.delete (java.io.File. "alldtypes-sparse.arrow"))))) (deftest arrow-file-types ;;lz4 compression (let [all-files ["test/data/alldtypes.arrow-feather" ;lz4 "test/data/alldtypes.arrow-feather-compressed" ;zstd "test/data/alldtypes.arrow-feather-v1" ;v1 ]] (doseq [file all-files] (is (= 1000 (ds/row-count (arrow/stream->dataset file))))) ;; lz4 with dependent frames)))))) (is (= 31962 (ds/row-count (arrow/stream->dataset "test/data/tweets_sentiment.feather")))))) (deftest base-ds-seq-test (try (let [ds (supported-datatype-ds) _ (arrow/dataset-seq->stream! "alldtypes-seq.arrow" {:strings-as-text? false} [ds ds ds]) mmap-ds-seq (arrow/stream->dataset-seq "alldtypes-seq.arrow" {:key-fn keyword :open-type :mmap}) copy-ds-seq (arrow/stream->dataset-seq "alldtypes-seq.arrow" {:key-fn keyword})] (is (= 3 (count mmap-ds-seq))) (is (= 3 (count copy-ds-seq))) (let [mmap-ds (last mmap-ds-seq) copy-ds (last copy-ds-seq)] (doseq [col (vals ds)] (let [cname ((meta col) :name) dt (dtype/elemwise-datatype col) inp-col (mmap-ds cname) cp-col (copy-ds cname)] (is (= dt (dtype/elemwise-datatype inp-col)) (str "inplace failure " cname)) (is (= dt (dtype/elemwise-datatype cp-col)) (str "copy failure " cname)) (is (= (vec col) (vec inp-col)) (str "inplace failure " cname)) (is (= (vec col) (vec cp-col)) (str "copy failure " cname)))))) (finally (.delete (java.io.File. "alldtypes-seq.arrow"))))) (deftest simple-stocks (try (let [stocks (ds/->dataset "test/data/stocks.csv") _ (arrow/dataset->stream! stocks "temp.stocks.arrow") stocks-copying (arrow/stream->dataset "temp.stocks.arrow") stocks-inplace (arrow/stream->dataset "temp.stocks.arrow" {:open-type :mmap}) pystocks-copying (arrow/stream->dataset "test/data/stocks.pyarrow.stream") pystocks-inplace (arrow/stream->dataset "test/data/stocks.pyarrow.stream")] ;;This is here just to make sure that the data isn't cleaned up until it ;;actually can safely be cleaned up. This was a bug that caused datatype to ;;bump from 5.11 to 5.12 (System/gc) (is (dfn/equals (stocks "price") (stocks-copying "price"))) (is (dfn/equals (stocks "price") (stocks-inplace "price"))) (is (dfn/equals (stocks "price") (pystocks-copying "price"))) (is (dfn/equals (stocks "price") (pystocks-inplace "price"))) (is (= (vec (stocks "symbol")) (vec (stocks-copying "symbol")))) (is (= (vec (stocks "symbol")) (vec (stocks-inplace "symbol")))) ;;python saves strings inline in the file - equivalent to :strings-as-text? ;;save option (is (= (vec (stocks "symbol")) (mapv str (pystocks-copying "symbol")))) (is (= (vec (stocks "symbol")) (mapv str (pystocks-inplace "symbol"))))) (finally (.delete (java.io.File. "temp.stocks.arrow"))))) (deftest ames-house-prices (try (let [ames (ds/->dataset "test/data/ames-house-prices/train.csv") _ (arrow/dataset->stream! ames "temp.ames.arrow") ames-copying (arrow/stream->dataset "temp.ames.arrow") ames-inplace (arrow/stream->dataset "temp.ames.arrow" {:open-type :mmap}) pyames-copying (arrow/stream->dataset "test/data/ames.pyarrow.stream") pyames-inplace (arrow/stream->dataset "test/data/ames.pyarrow.stream")] (System/gc) (is (dfn/equals (ames "SalePrice") (ames-copying "SalePrice"))) (is (dfn/equals (ames "SalePrice") (ames-inplace "SalePrice"))) (is (= (ds-col/missing (ames "LotFrontage")) (ds-col/missing (ames-copying "LotFrontage")))) (is (= (ds-col/missing (ames "LotFrontage")) (ds-col/missing (ames-inplace "LotFrontage")))) (is (not= 0 (dtype/ecount (ds-col/missing (ames-inplace "LotFrontage"))))) (is (dfn/equals (ames "SalePrice") (pyames-copying "SalePrice"))) (is (dfn/equals (ames "SalePrice") (pyames-inplace "SalePrice"))) (is (= (ds-col/missing (ames "LotFrontage")) (ds-col/missing (pyames-copying "LotFrontage")))) (is (= (ds-col/missing (ames "LotFrontage")) (ds-col/missing (pyames-inplace "LotFrontage"))))) (finally (.delete (java.io.File. "temp.ames.arrow"))))) (deftest ames-compression-test (try (let [ames (ds/->dataset "test/data/ames-house-prices/train.csv") _ (arrow/dataset->stream! ames "ames-uncompressed.arrow") _ (arrow/dataset->stream! ames "ames-zstd.arrow" {:compression {:compression-type :zstd ;;default is 3 :level 5}}) _ (arrow/dataset->stream! ames "ames-lz4.arrow" {:compression :lz4}) _ (arrow/dataset->stream! (sparse-col/->sparse-ds ames) "ames-sparse-zstd.arrow" {:compression {:compression-type :zstd ;;default is 3 :level 5}}) file-len (fn [path] (.length (java.io.File. (str path)))) _ (println (ds/->dataset {:save-type [:uncompressed :zstd :sparse-zstd :lz4] :file-size [(file-len "ames-uncompressed.arrow") (file-len "ames-zstd.arrow") (file-len "ames-sparse-zstd.arrow") (file-len "ames-lz4.arrow")]})) uncomp (arrow/stream->dataset "ames-uncompressed.arrow") zstd (arrow/stream->dataset "ames-zstd.arrow") sparse-zstd (arrow/stream->dataset "ames-sparse-zstd.arrow") lz4 (arrow/stream->dataset "ames-lz4.arrow")] (System/gc) (is (dfn/equals (uncomp "SalePrice") (zstd "SalePrice"))) (is (dfn/equals (uncomp "LotFrontage") (sparse-zstd "LotFrontage"))) (is (dfn/equals (uncomp "SalePrice") (lz4 "SalePrice")))) (finally (.delete (java.io.File. "ames-uncompressed.arrow")) (.delete (java.io.File. "ames-zstd.arrow")) (.delete (java.io.File. "ames-sparse-zstd.arrow")) (.delete (java.io.File. "ames-lz4.arrow"))))) (deftest date-arrow-test (let [date-data (arrow/read-stream-dataset-copying "test/data/with_date.arrow" {:integer-datetime-types? true})] (is (= [18070 18072 18063] (date-data "date"))) (is (= :epoch-days (dtype/elemwise-datatype (date-data "date"))))) (let [date-data (arrow/read-stream-dataset-copying "test/data/with_date.arrow")] (is (= (mapv #(java.time.LocalDate/parse %) ["2019-06-23" "2019-06-25" "2019-06-16"]) (date-data "date"))) (is (= :packed-local-date (dtype/elemwise-datatype (date-data "date")))))) (deftest odd-parquet-crash (let [test-data (ds/->dataset "test/data/part-00000-74d3eb51-bc9c-4ba5-9d13-9e0d71eea31f.c000.snappy.parquet")] (try (arrow/write-dataset-to-stream! test-data "test.arrow") (let [arrow-ds (arrow/read-stream-dataset-copying "test.arrow")] (is (= (ds/missing test-data) (ds/missing arrow-ds)))) (finally (.delete (java.io.File. "test.arrow")))))) (deftest failed-R-file (let [cp-data (arrow/read-stream-dataset-copying "test/data/part-8981.ipc_stream") inp-data (arrow/read-stream-dataset-inplace "test/data/part-8981.ipc_stream")] (is (= (vec (ds/column-names cp-data)) (vec (ds/column-names inp-data)))))) (deftest large-var-char-file (let [cp-data (arrow/read-stream-dataset-copying "test/data/largeVarChar.ipc") inp-data (arrow/read-stream-dataset-inplace "test/data/largeVarChar.ipc")] (is (= (vec (ds/column-names cp-data)) (vec (ds/column-names inp-data)))) (is (= (vec (first (ds/columns cp-data))) (vec (first (ds/columns inp-data))))))) (deftest uuid-test (let [py-uuid (ds/->dataset "test/data/uuid_ext.arrow" {:key-fn keyword})] (is (= :uuid (dtype/elemwise-datatype (py-uuid :id)))) (is (= (mapv #(java.util.UUID/fromString %) ["8be643d6-0df7-4e5e-837c-f94170c87914" "24bc9cf4-e2e8-444f-bb2d-82394f33ff76" "e8149e1b-aef6-4671-b1b4-3b7a21eed92a"]) (py-uuid :id)))) (try (let [uuid-ds (ds/->dataset "test/data/uuid.parquet" {:parser-fn {"uuids" :uuid}}) _ (arrow/write-dataset-to-stream! uuid-ds "test-uuid.arrow") copying-ds (arrow/read-stream-dataset-copying "test-uuid.arrow") inplace-ds (arrow/read-stream-dataset-inplace "test-uuid.arrow")] (is (= :uuid ((comp :datatype meta) (copying-ds "uuids")))) (is (= :uuid ((comp :datatype meta) (inplace-ds "uuids")))) (is (= (vec (copying-ds "uuids")) (vec (inplace-ds "uuids")))) (is (= (vec (uuid-ds "uuids")) (vec (copying-ds "uuids"))))) (finally (.delete (java.io.File. "test-uuid.arrow"))))) (deftest local-time (try (let [ds (ds/->dataset {"a" (range 10) "b" (repeat 10 (java.time.LocalTime/now))}) _ (arrow/write-dataset-to-stream! ds "test-local-time.arrow") copying-ds (arrow/read-stream-dataset-copying "test-local-time.arrow") inplace-ds (arrow/read-stream-dataset-inplace "test-local-time.arrow")] (is (= :packed-local-time (dtype/elemwise-datatype (copying-ds "b")))) (is (= :packed-local-time (dtype/elemwise-datatype (inplace-ds "b")))) (is (= (vec (copying-ds "b")) (vec (inplace-ds "b")))) ;;Making a primitive container will use the packed data. (is (= (vec (ds "b")) (vec (copying-ds "b"))))) (finally (.delete (java.io.File. "test-local-time.arrow"))))) (deftest string-arrow (let [dataset (ds/->dataset [{"col1" "a"}] {:parser-fn :string}) baos (ByteArrayOutputStream.)] (resource/stack-resource-context (arrow/dataset->stream! dataset baos {:compression :lz4}) (let [written-bytes (.toByteArray baos) arrow-ds-rtt (arrow/stream->dataset written-bytes) _ (.reset baos) _ (arrow/dataset->stream! arrow-ds-rtt baos {:compression :lz4}) b2 (.toByteArray baos) final-ds (arrow/stream->dataset b2)] (is (= (vec (dataset "col1")) (vec (final-ds "col1")))))))) (deftest nullcol (let [ds (arrow/stream->dataset "test/data/withnullcol.arrow")] (is (= (vec (range (ds/row-count ds))) (vec (ds/missing (ds "nullcol"))))))) (deftest list-datatypes-read-only (let [ds (ds/->dataset "test/data/arrow_list.arrow")] (is (= [["dog" "car"] ["dog" "flower"] ["car" "flower"]] (mapv vec (ds "class-name")))))) (deftest empty-array-dataset (is (nil? (arrow/stream->dataset "test/data/empty.arrow"))))