API Reference

(ns windowed-dataset.api-reference
  (:require [scicloj.windowed-dataset.api :as wd]
            [tablecloth.api :as tc]
            [java-time.api :as java-time]
            [clojure.string :as str]
            [scicloj.kindly.v4.api :as kindly]
            [scicloj.kindly.v4.kind :as kind]))

WindowedDataset Record

The WindowedDataset record implements a circular buffer data structure optimized for time-series analysis:

(defrecord WindowedDataset
          [dataset           ; tech.v3.dataset containing the actual data
           column-types      ; map of column names to data types
           max-size         ; maximum number of rows the buffer can hold
           current-size     ; current number of rows (0 to max-size)
           current-position ; current write position (circular index)])

Key characteristics:

  • Mutable - Designed for performance in streaming scenarios
  • Fixed memory - Pre-allocates space for predictable memory usage
  • Circular buffer - Automatically overwrites oldest data when full
  • Chronological access - Data is always returned in insertion order
  • Zero-copy views - Time windows are extracted without copying data

Typical workflow:

  1. Create with make-windowed-dataset specifying column types and buffer size
  2. Insert streaming data with insert-to-windowed-dataset!
  3. Extract time windows with windowed-dataset->time-window-dataset
  4. Compute metrics over specific time periods

WindowedDataset Structure Example

(let [;; Create a windowed dataset to examine its structure
      windowed-ds (wd/make-windowed-dataset {:timestamp :instant :value :float64} 3)
      base-time (java-time/instant)

      ;; Add one data point to see the structure
      wd-with-data (wd/insert-to-windowed-dataset! windowed-ds {:timestamp base-time :value 42.5})]

  ;; **WindowedDataset Record Fields:**
  {:dataset "tech.v3.dataset (Internal data storage)"
   :column-types (:column-types wd-with-data)
   :max-size (:max-size wd-with-data)
   :current-size (:current-size wd-with-data)
   :current-position (:current-position wd-with-data)})
{:dataset "tech.v3.dataset (Internal data storage)",
 :column-types {:timestamp :instant, :value :float64},
 :max-size 3,
 :current-size 1,
 :current-position 1}

Circular Buffer Behavior

(let [;; Demonstrate circular buffer behavior
      small-wd (wd/make-windowed-dataset {:value :int32} 3)

      ;; Fill beyond capacity to show circular behavior
      test-data (map (fn [i] {:value i}) (range 5))
      final-wd (reduce wd/insert-to-windowed-dataset! small-wd test-data)]

  ;; **Circular Buffer Example (capacity: 3, inserted: 5 values):**
  ;; Final state: size=3, position=2 (values 0,1 were overwritten by 3,4)
  ;; **Data in chronological order:**
  (wd/windowed-dataset->dataset final-wd))

_unnamed [3 1]:

:value
2
3
4

make-windowed-dataset

[column-types max-size]

Create an empty WindowedDataset with a given max-size and given column-types (map).

Args:

  • column-types - a map from column name to type
  • max-size - maximal window size to keep

Returns: The specified WindowedDataset structure.

Example

(let [;; Create a windowed dataset for sensor data with 10-sample capacity
      column-spec {:timestamp :instant
                   :temperature :float64
                   :sensor-id :string}
      windowed-ds (wd/make-windowed-dataset column-spec 10)]

  ;; **Created windowed dataset:**
  {:max-size (:max-size windowed-ds)
   :current-size (:current-size windowed-ds)
   :current-position (:current-position windowed-ds)
   :column-types (:column-types windowed-ds)})
{:max-size 10,
 :current-size 0,
 :current-position 0,
 :column-types
 {:timestamp :instant, :temperature :float64, :sensor-id :string}}

insert-to-windowed-dataset!

[{:as windowed-dataset, :keys [dataset column-types max-size current-position]} value]

Insert a new row to a WindowedDataset.

Args:

  • windowed-dataset - a WindowedDataset
  • row - A row represented as a map structure (can be a record or FastStruct, etc.)

Returns: Updated windowed dataset with its data mutated(!).

Example

(let [;; Create windowed dataset
      windowed-ds (wd/make-windowed-dataset {:timestamp :instant :temperature :float64 :sensor-id :string} 5)
      base-time (java-time/instant)

      ;; Insert some data points
      sample-data [{:timestamp base-time :temperature 22.5 :sensor-id "temp-001"}
                   {:timestamp (java-time/plus base-time (java-time/seconds 30)) :temperature 23.1 :sensor-id "temp-001"}
                   {:timestamp (java-time/plus base-time (java-time/seconds 60)) :temperature 22.8 :sensor-id "temp-001"}]

      ;; Insert data step by step
      wd-step1 (wd/insert-to-windowed-dataset! windowed-ds (first sample-data))
      wd-step2 (wd/insert-to-windowed-dataset! wd-step1 (second sample-data))
      final-wd (wd/insert-to-windowed-dataset! wd-step2 (last sample-data))]

  ;; **Windowed dataset after inserting 3 records:**
  ;; Current size: 3
  ;; **Data view:**
  (wd/windowed-dataset->dataset final-wd))

_unnamed [3 3]:

:timestamp :temperature :sensor-id
2025-08-07T23:15:12.051574707Z 22.5 temp-001
2025-08-07T23:15:42.051574707Z 23.1 temp-001
2025-08-07T23:16:12.051574707Z 22.8 temp-001

windowed-dataset-indices

[{:keys [max-size current-size current-position]}]

Extract the row indices for retrieving data from a windowed dataset in insertion order.

This utility function encapsulates the logic for determining which rows to select from the underlying dataset to present data in the correct chronological order.

Args:

  • windowed-dataset - a WindowedDataset

Returns: Vector of integer indices in the correct order for data retrieval

Example

(let [;; Create and populate a small windowed dataset
      windowed-ds (wd/make-windowed-dataset {:value :int32} 4)
      ;; Insert 6 items (will wrap around)
      final-wd (reduce wd/insert-to-windowed-dataset! windowed-ds
                       (map (fn [i] {:value i}) (range 6)))]

  ;; **Windowed dataset with circular buffer behavior:**
  ;; Dataset state: size=4, position=2, max=4
  ;; **Index order for chronological access:**
  {:indices (wd/windowed-dataset-indices final-wd)
   ;; **Data in insertion order:**
   :data (wd/windowed-dataset->dataset final-wd)})

{

:indices [2 3 0 1]
:data

_unnamed [4 1]:

:value
2
3
4
5

}

windowed-dataset->dataset

[{:as windowed-dataset, :keys [dataset]}]

Return a regular dataset as a view over the content of a windowed dataset.

Args:

  • windowed-dataset - a WindowedDataset

Example

(let [;; Create windowed dataset with sample sensor data
      base-time (java-time/instant)
      sensor-readings (map (fn [i reading]
                             {:timestamp (java-time/plus base-time (java-time/seconds (* i 30)))
                              :temperature reading
                              :reading-id i})
                           (range 8)
                           [22.1 22.5 22.8 23.2 22.9 23.1 22.7 22.4])
      windowed-ds (wd/make-windowed-dataset {:timestamp :instant :temperature :float64 :reading-id :int32} 5)
      final-wd (reduce wd/insert-to-windowed-dataset! windowed-ds sensor-readings)]

  ;; **Converting windowed dataset to regular dataset:**
  ;; Inserted 8 temperature readings into 5-capacity window (last 5 retained):
  (wd/windowed-dataset->dataset final-wd))

_unnamed [5 3]:

:timestamp :temperature :reading-id
2025-08-07T23:16:42.058149515Z 23.2 3
2025-08-07T23:17:12.058149515Z 22.9 4
2025-08-07T23:17:42.058149515Z 23.1 5
2025-08-07T23:18:12.058149515Z 22.7 6
2025-08-07T23:18:42.058149515Z 22.4 7

binary-search-timestamp-start

[timestamp-col indices target-time]

Binary search to find the first index where timestamp >= target-time.

Args:

  • timestamp-col - the timestamp column from the dataset
  • indices - vector of indices in chronological order
  • target-time - the target timestamp to search for

Returns: Index in the indices vector where the search should start

Example

(let [;; Create sample timestamp data
      base-time (java-time/instant)
      timestamps (map #(java-time/plus base-time (java-time/seconds (* % 60))) (range 5))
      timestamp-col (vec timestamps)
      indices (vec (range 5))

      ;; Search for different target times
      search-cases [[(java-time/plus base-time (java-time/seconds 90)) "Between timestamps"]
                    [(java-time/plus base-time (java-time/seconds 120)) "Exact match"]
                    [(java-time/minus base-time (java-time/seconds 30)) "Before all timestamps"]
                    [(java-time/plus base-time (java-time/seconds 300)) "After all timestamps"]]]

  ;; **Binary search examples:**
  ;; Timestamps: [formatted times]
  (map (fn [[target-time description]]
         {:target-time (str target-time)
          :description description
          :found-position (wd/binary-search-timestamp-start timestamp-col indices target-time)})
       search-cases))
({:target-time "2025-08-07T23:16:42.061505679Z",
  :description "Between timestamps",
  :found-position 2}
 {:target-time "2025-08-07T23:17:12.061505679Z",
  :description "Exact match",
  :found-position 2}
 {:target-time "2025-08-07T23:14:42.061505679Z",
  :description "Before all timestamps",
  :found-position 0}
 {:target-time "2025-08-07T23:20:12.061505679Z",
  :description "After all timestamps",
  :found-position 5})

windowed-dataset->time-window-dataset

[{:as windowed-dataset, :keys [dataset]} timestamp-colname time-window]

Return a regular dataset as a view over the content of a windowed dataset, including only a recent time window. Uses binary search for optimal performance.

Args:

  • windowed-dataset - a WindowedDataset
  • timestamp-colname - the name of the column that contains timestamps
  • time-window - window length in ms (from most recent timestamp backwards)

Returns: Dataset containing only data within the specified time window

Performance: O(log n) time complexity using binary search

Example

(let [;; Create realistic sensor scenario with timestamps
      base-time (java-time/instant)
      readings [22.1 22.3 21.9 22.5 22.2 22.7 22.0 22.4 22.1 21.8 22.2 22.0 22.6 22.1 22.5]

      ;; Create timestamped data (measurements every 30 seconds)
      sensor-data (map-indexed (fn [i reading]
                                 {:timestamp (java-time/plus base-time (java-time/seconds (* i 30)))
                                  :temperature reading
                                  :reading-id i})
                               readings)

      windowed-ds (wd/make-windowed-dataset {:timestamp :instant :temperature :float64 :reading-id :int32} 20)
      final-wd (reduce wd/insert-to-windowed-dataset! windowed-ds sensor-data)]

  ;; **Time window extraction examples:**
  ;; Created 15 temperature readings over ~7.5 minutes

  ;; **Last 2 minutes of data:**
  {:last-2-minutes (wd/windowed-dataset->time-window-dataset final-wd :timestamp 120000)

   ;; **Last 5 minutes of data:**
   :last-5-minutes (wd/windowed-dataset->time-window-dataset final-wd :timestamp 300000)

   ;; **All data (10-minute window):**
   :all-data (-> (wd/windowed-dataset->time-window-dataset final-wd :timestamp 600000)
                 (tc/select-columns [:reading-id :temperature]))})

{

:last-2-minutes

_unnamed [5 3]:

:timestamp :temperature :reading-id
2025-08-07T23:20:12.074294834Z 22.2 10
2025-08-07T23:20:42.074294834Z 22.0 11
2025-08-07T23:21:12.074294834Z 22.6 12
2025-08-07T23:21:42.074294834Z 22.1 13
2025-08-07T23:22:12.074294834Z 22.5 14
:last-5-minutes

_unnamed [11 3]:

:timestamp :temperature :reading-id
2025-08-07T23:17:12.074294834Z 22.2 4
2025-08-07T23:17:42.074294834Z 22.7 5
2025-08-07T23:18:12.074294834Z 22.0 6
2025-08-07T23:18:42.074294834Z 22.4 7
2025-08-07T23:19:12.074294834Z 22.1 8
2025-08-07T23:19:42.074294834Z 21.8 9
2025-08-07T23:20:12.074294834Z 22.2 10
2025-08-07T23:20:42.074294834Z 22.0 11
2025-08-07T23:21:12.074294834Z 22.6 12
2025-08-07T23:21:42.074294834Z 22.1 13
2025-08-07T23:22:12.074294834Z 22.5 14
:all-data

_unnamed [15 2]:

:reading-id :temperature
0 22.1
1 22.3
2 21.9
3 22.5
4 22.2
5 22.7
6 22.0
7 22.4
8 22.1
9 21.8
10 22.2
11 22.0
12 22.6
13 22.1
14 22.5

}

copy-windowed-dataset

[{:as windowed-dataset, :keys [dataset column-types max-size current-size current-position]}]

Create a deep copy of a windowed dataset.

Args:

  • windowed-dataset - a WindowedDataset

Returns: New WindowedDataset with copied data

Example

(let [;; Create and populate a windowed dataset
      base-time (java-time/instant)
      original-data [{:timestamp base-time :temperature 22.5}
                     {:timestamp (java-time/plus base-time (java-time/seconds 30)) :temperature 23.1}
                     {:timestamp (java-time/plus base-time (java-time/seconds 60)) :temperature 22.8}]

      windowed-ds (wd/make-windowed-dataset {:timestamp :instant :temperature :float64} 5)
      populated-wd (reduce wd/insert-to-windowed-dataset! windowed-ds original-data)

      ;; Create a deep copy
      copied-wd (wd/copy-windowed-dataset populated-wd)]

  ;; **Deep copy windowed dataset example:**
  {:original-state {:size (:current-size populated-wd)
                    :position (:current-position populated-wd)}
   :copied-state {:size (:current-size copied-wd)
                  :position (:current-position copied-wd)}
   :data-identical (= (tc/rows (wd/windowed-dataset->dataset populated-wd))
                      (tc/rows (wd/windowed-dataset->dataset copied-wd)))})
{:original-state {:size 3, :position 3},
 :copied-state {:size 3, :position 3},
 :data-identical true}

add-column-by-windowed-fn

[time-series {:keys [colname windowed-fn windowed-dataset-size]}]

Add a new column to a time-series by applying a windowed function progressively.

This function simulates real-time streaming analysis on historical time-series data. For each row in the time-series (processed in timestamp order), it:

  1. Inserts the row into a growing windowed dataset
  2. Applies the windowed function to calculate a result
  3. Uses that result as the column value for that row

This bridges the gap between streaming windowed analysis and batch processing of existing time-series data, allowing you to see how metrics evolve over time as if the data were being processed in real-time.

Args:

  • time-series - a tablecloth dataset with timestamp-ordered data
  • options - map with keys:
    • :colname - name of the new column to add
    • :windowed-fn - function that takes a WindowedDataset and returns a value
    • :windowed-dataset-size - size of the windowed dataset buffer (currently ignored, uses 120)

Returns: The original time-series with the new column added, where each row contains the result of applying the windowed function to all data up to that timestamp

Use Cases: - Adding progressive metrics to time-series - Creating trend analysis columns that consider historical context - Simulating real-time algorithm behavior on historical data - Generating training data with progressive features for ML models

Examples

(let [time-series (tc/dataset {:timestamp [(java-time/instant)
                                           (java-time/plus (java-time/instant) (java-time/seconds 30))
                                           (java-time/plus (java-time/instant) (java-time/seconds 60))
                                           (java-time/plus (java-time/instant) (java-time/seconds 90))]
                               :value [10.0 20.0 15.0 25.0]})

      ;; Define a simple moving average function
      moving-avg-fn (fn [windowed-ds]
                      (let [regular-ds (wd/windowed-dataset->dataset windowed-ds)
                            values (:value regular-ds)]
                        (when (seq values)
                          (/ (reduce + values) (count values)))))

      result (wd/add-column-by-windowed-fn time-series
                                           {:colname :moving-avg
                                            :windowed-fn moving-avg-fn
                                            :windowed-dataset-size 10})]
  (tc/select-columns result [:timestamp :value :moving-avg]))

_unnamed [4 3]:

:timestamp :value :moving-avg
2025-08-07T23:15:12.088482Z 10.0
2025-08-07T23:15:42.088484Z 20.0 10.0
2025-08-07T23:16:12.088494Z 15.0 15.0
2025-08-07T23:16:42.088496Z 25.0 15.0

Smoothing Functions

moving-average

[windowed-dataset window-size value-colname]

Calculate simple moving average of recent data in windowed dataset.

Args:

  • windowed-dataset - a WindowedDataset
  • window-size - number of recent samples to average
  • value-colname - column name containing values to be processed

Returns: Moving average of the most recent window-size samples, or nil if insufficient data

Example

(let [wd (wd/make-windowed-dataset {:x :int32} 10)
      data [{:x 800} {:x 850} {:x 820}]
      populated-wd (reduce wd/insert-to-windowed-dataset! wd data)]
  (wd/moving-average populated-wd 3 :x))
2470/3

median-filter

[windowed-dataset window-size value-colname]

Apply median filter to the most recent data in a windowed dataset.

Args:

  • windowed-dataset - a WindowedDataset
  • window-size - number of recent samples to use for median calculation
  • value-colname - column name containing values to be processed

Returns: Median value of the most recent window-size samples, or nil if insufficient data

Example

(let [wd (wd/make-windowed-dataset {:x :int32} 10)
      data [{:x 800} {:x 1200} {:x 820}] ; middle value is outlier
      populated-wd (reduce wd/insert-to-windowed-dataset! wd data)]
  (wd/median-filter populated-wd 3 :x))
820

cascaded-median-filter

[windowed-dataset value-colname]

Apply cascaded median filters (3-point then 5-point) for robust smoothing.

Args:

  • windowed-dataset - a WindowedDataset
  • value-colname - column name containing values to be processed

Returns: Cascaded median filtered value, or nil if insufficient data (needs 5+ samples)

Example

(let [wd (wd/make-windowed-dataset {:x :int32} 10)
      data [{:x 800} {:x 1200} {:x 820} {:x 1100} {:x 810}]
      populated-wd (reduce wd/insert-to-windowed-dataset! wd data)]
  (wd/cascaded-median-filter populated-wd :x))
820

exponential-moving-average

[windowed-dataset alpha value-colname]

Calculate exponential moving average of recent data in windowed dataset.

Args:

  • windowed-dataset - a WindowedDataset
  • alpha - smoothing factor (0 < alpha <= 1, higher = more responsive)
  • value-colname - column name containing values to be processed

Returns: EMA value, or nil if no data available

Example

(let [wd (wd/make-windowed-dataset {:x :int32} 10)
      data [{:x 800} {:x 850} {:x 820}]
      populated-wd (reduce wd/insert-to-windowed-dataset! wd data)]
  (wd/exponential-moving-average populated-wd 0.3 :x))
816.5

cascaded-smoothing-filter

[windowed-dataset median-window ma-window value-colname]

Apply cascaded smoothing: median filter followed by moving average.

This combines the outlier-removal power of median filtering with the noise-reduction benefits of moving averages for comprehensive cleaning.

Args:

  • windowed-dataset - a WindowedDataset
  • median-window - window size for median filter
  • ma-window - window size for moving average
  • value-colname - column name containing values to be processed

Returns: Final smoothed value, or nil if insufficient data

Example

(let [wd (wd/make-windowed-dataset {:x :int32} 15)
      ;; Data with noise and outliers
      data [{:x 800} {:x 820} {:x 1500} {:x 810}
            {:x 805} {:x 815} {:x 2000} {:x 812}
            {:x 808} {:x 795}]
      populated-wd (reduce wd/insert-to-windowed-dataset! wd data)]

  ;; Compare cascaded smoothing with individual methods
  {:median-only (wd/median-filter populated-wd 5 :x)
   :moving-avg-only (wd/moving-average populated-wd 5 :x)
   :cascaded-5-3 (wd/cascaded-smoothing-filter populated-wd 5 3 :x)})
{:median-only 812, :moving-avg-only 1046, :cascaded-5-3 805.0}