API Reference
ns windowed-dataset.api-reference
(:require [scicloj.windowed-dataset.api :as wd]
(:as tc]
[tablecloth.api :as java-time]
[java-time.api :as str]
[clojure.string :as kindly]
[scicloj.kindly.v4.api :as kind])) [scicloj.kindly.v4.kind
WindowedDataset Record
The WindowedDataset
record implements a circular buffer data structure optimized for time-series analysis:
defrecord WindowedDataset
(; tech.v3.dataset containing the actual data
[dataset ; map of column names to data types
column-types ; maximum number of rows the buffer can hold
max-size ; current number of rows (0 to max-size)
current-size ; current write position (circular index)]) current-position
Key characteristics:
- Mutable - Designed for performance in streaming scenarios
- Fixed memory - Pre-allocates space for predictable memory usage
- Circular buffer - Automatically overwrites oldest data when full
- Chronological access - Data is always returned in insertion order
- Zero-copy views - Time windows are extracted without copying data
Typical workflow:
- Create with
make-windowed-dataset
specifying column types and buffer size - Insert streaming data with
insert-to-windowed-dataset!
- Extract time windows with
windowed-dataset->time-window-dataset
- Compute metrics over specific time periods
WindowedDataset Structure Example
let [;; Create a windowed dataset to examine its structure
(:timestamp :instant :value :float64} 3)
windowed-ds (wd/make-windowed-dataset {
base-time (java-time/instant)
;; Add one data point to see the structure
:timestamp base-time :value 42.5})]
wd-with-data (wd/insert-to-windowed-dataset! windowed-ds {
;; **WindowedDataset Record Fields:**
:dataset "tech.v3.dataset (Internal data storage)"
{:column-types (:column-types wd-with-data)
:max-size (:max-size wd-with-data)
:current-size (:current-size wd-with-data)
:current-position (:current-position wd-with-data)})
:dataset "tech.v3.dataset (Internal data storage)",
{:column-types {:timestamp :instant, :value :float64},
:max-size 3,
:current-size 1,
:current-position 1}
Circular Buffer Behavior
let [;; Demonstrate circular buffer behavior
(:value :int32} 3)
small-wd (wd/make-windowed-dataset {
;; Fill beyond capacity to show circular behavior
map (fn [i] {:value i}) (range 5))
test-data (reduce wd/insert-to-windowed-dataset! small-wd test-data)]
final-wd (
;; **Circular Buffer Example (capacity: 3, inserted: 5 values):**
;; Final state: size=3, position=2 (values 0,1 were overwritten by 3,4)
;; **Data in chronological order:**
(wd/windowed-dataset->dataset final-wd))
_unnamed [3 1]:
:value |
---|
2 |
3 |
4 |
make-windowed-dataset
[column-types max-size]
Create an empty WindowedDataset
with a given max-size
and given column-types
(map).
Args:
column-types
- a map from column name to typemax-size
- maximal window size to keep
Returns: The specified WindowedDataset
structure.
Example
let [;; Create a windowed dataset for sensor data with 10-sample capacity
(:timestamp :instant
column-spec {:temperature :float64
:sensor-id :string}
10)]
windowed-ds (wd/make-windowed-dataset column-spec
;; **Created windowed dataset:**
:max-size (:max-size windowed-ds)
{:current-size (:current-size windowed-ds)
:current-position (:current-position windowed-ds)
:column-types (:column-types windowed-ds)})
:max-size 10,
{:current-size 0,
:current-position 0,
:column-types
:timestamp :instant, :temperature :float64, :sensor-id :string}} {
insert-to-windowed-dataset!
[{:as windowed-dataset, :keys [dataset column-types max-size current-position]} value]
Insert a new row to a WindowedDataset
.
Args:
windowed-dataset
- aWindowedDataset
row
- A row represented as a map structure (can be a record orFastStruct
, etc.)
Returns: Updated windowed dataset with its data mutated(!).
Example
let [;; Create windowed dataset
(:timestamp :instant :temperature :float64 :sensor-id :string} 5)
windowed-ds (wd/make-windowed-dataset {
base-time (java-time/instant)
;; Insert some data points
:timestamp base-time :temperature 22.5 :sensor-id "temp-001"}
sample-data [{:timestamp (java-time/plus base-time (java-time/seconds 30)) :temperature 23.1 :sensor-id "temp-001"}
{:timestamp (java-time/plus base-time (java-time/seconds 60)) :temperature 22.8 :sensor-id "temp-001"}]
{
;; Insert data step by step
first sample-data))
wd-step1 (wd/insert-to-windowed-dataset! windowed-ds (second sample-data))
wd-step2 (wd/insert-to-windowed-dataset! wd-step1 (last sample-data))]
final-wd (wd/insert-to-windowed-dataset! wd-step2 (
;; **Windowed dataset after inserting 3 records:**
;; Current size: 3
;; **Data view:**
(wd/windowed-dataset->dataset final-wd))
_unnamed [3 3]:
:timestamp | :temperature | :sensor-id |
---|---|---|
2025-08-07T23:15:12.051574707Z | 22.5 | temp-001 |
2025-08-07T23:15:42.051574707Z | 23.1 | temp-001 |
2025-08-07T23:16:12.051574707Z | 22.8 | temp-001 |
windowed-dataset-indices
[{:keys [max-size current-size current-position]}]
Extract the row indices for retrieving data from a windowed dataset in insertion order.
This utility function encapsulates the logic for determining which rows to select from the underlying dataset to present data in the correct chronological order.
Args:
windowed-dataset
- aWindowedDataset
Returns: Vector of integer indices in the correct order for data retrieval
Example
let [;; Create and populate a small windowed dataset
(:value :int32} 4)
windowed-ds (wd/make-windowed-dataset {;; Insert 6 items (will wrap around)
reduce wd/insert-to-windowed-dataset! windowed-ds
final-wd (map (fn [i] {:value i}) (range 6)))]
(
;; **Windowed dataset with circular buffer behavior:**
;; Dataset state: size=4, position=2, max=4
;; **Index order for chronological access:**
:indices (wd/windowed-dataset-indices final-wd)
{;; **Data in insertion order:**
:data (wd/windowed-dataset->dataset final-wd)})
{
:indices [2 3 0 1]
|
_unnamed [4 1]:
|
}
windowed-dataset->dataset
[{:as windowed-dataset, :keys [dataset]}]
Return a regular dataset as a view over the content of a windowed dataset.
Args:
windowed-dataset
- aWindowedDataset
Example
let [;; Create windowed dataset with sample sensor data
(
base-time (java-time/instant)map (fn [i reading]
sensor-readings (:timestamp (java-time/plus base-time (java-time/seconds (* i 30)))
{:temperature reading
:reading-id i})
range 8)
(22.1 22.5 22.8 23.2 22.9 23.1 22.7 22.4])
[:timestamp :instant :temperature :float64 :reading-id :int32} 5)
windowed-ds (wd/make-windowed-dataset {reduce wd/insert-to-windowed-dataset! windowed-ds sensor-readings)]
final-wd (
;; **Converting windowed dataset to regular dataset:**
;; Inserted 8 temperature readings into 5-capacity window (last 5 retained):
(wd/windowed-dataset->dataset final-wd))
_unnamed [5 3]:
:timestamp | :temperature | :reading-id |
---|---|---|
2025-08-07T23:16:42.058149515Z | 23.2 | 3 |
2025-08-07T23:17:12.058149515Z | 22.9 | 4 |
2025-08-07T23:17:42.058149515Z | 23.1 | 5 |
2025-08-07T23:18:12.058149515Z | 22.7 | 6 |
2025-08-07T23:18:42.058149515Z | 22.4 | 7 |
binary-search-timestamp-start
[timestamp-col indices target-time]
Binary search to find the first index where timestamp >= target-time.
Args:
timestamp-col
- the timestamp column from the datasetindices
- vector of indices in chronological ordertarget-time
- the target timestamp to search for
Returns: Index in the indices vector where the search should start
Example
let [;; Create sample timestamp data
(
base-time (java-time/instant)map #(java-time/plus base-time (java-time/seconds (* % 60))) (range 5))
timestamps (vec timestamps)
timestamp-col (vec (range 5))
indices (
;; Search for different target times
90)) "Between timestamps"]
search-cases [[(java-time/plus base-time (java-time/seconds 120)) "Exact match"]
[(java-time/plus base-time (java-time/seconds 30)) "Before all timestamps"]
[(java-time/minus base-time (java-time/seconds 300)) "After all timestamps"]]]
[(java-time/plus base-time (java-time/seconds
;; **Binary search examples:**
;; Timestamps: [formatted times]
map (fn [[target-time description]]
(:target-time (str target-time)
{:description description
:found-position (wd/binary-search-timestamp-start timestamp-col indices target-time)})
search-cases))
:target-time "2025-08-07T23:16:42.061505679Z",
({:description "Between timestamps",
:found-position 2}
:target-time "2025-08-07T23:17:12.061505679Z",
{:description "Exact match",
:found-position 2}
:target-time "2025-08-07T23:14:42.061505679Z",
{:description "Before all timestamps",
:found-position 0}
:target-time "2025-08-07T23:20:12.061505679Z",
{:description "After all timestamps",
:found-position 5})
windowed-dataset->time-window-dataset
[{:as windowed-dataset, :keys [dataset]} timestamp-colname time-window]
Return a regular dataset as a view over the content of a windowed dataset, including only a recent time window. Uses binary search for optimal performance.
Args:
windowed-dataset
- aWindowedDataset
timestamp-colname
- the name of the column that contains timestampstime-window
- window length in ms (from most recent timestamp backwards)
Returns: Dataset containing only data within the specified time window
Performance: O(log n) time complexity using binary search
Example
let [;; Create realistic sensor scenario with timestamps
(
base-time (java-time/instant)22.1 22.3 21.9 22.5 22.2 22.7 22.0 22.4 22.1 21.8 22.2 22.0 22.6 22.1 22.5]
readings [
;; Create timestamped data (measurements every 30 seconds)
fn [i reading]
sensor-data (map-indexed (:timestamp (java-time/plus base-time (java-time/seconds (* i 30)))
{:temperature reading
:reading-id i})
readings)
:timestamp :instant :temperature :float64 :reading-id :int32} 20)
windowed-ds (wd/make-windowed-dataset {reduce wd/insert-to-windowed-dataset! windowed-ds sensor-data)]
final-wd (
;; **Time window extraction examples:**
;; Created 15 temperature readings over ~7.5 minutes
;; **Last 2 minutes of data:**
:last-2-minutes (wd/windowed-dataset->time-window-dataset final-wd :timestamp 120000)
{
;; **Last 5 minutes of data:**
:last-5-minutes (wd/windowed-dataset->time-window-dataset final-wd :timestamp 300000)
;; **All data (10-minute window):**
:all-data (-> (wd/windowed-dataset->time-window-dataset final-wd :timestamp 600000)
:reading-id :temperature]))}) (tc/select-columns [
{
|
_unnamed [5 3]:
|
|
_unnamed [11 3]:
|
|
_unnamed [15 2]:
|
}
copy-windowed-dataset
[{:as windowed-dataset, :keys [dataset column-types max-size current-size current-position]}]
Create a deep copy of a windowed dataset.
Args:
windowed-dataset
- aWindowedDataset
Returns: New WindowedDataset
with copied data
Example
let [;; Create and populate a windowed dataset
(
base-time (java-time/instant):timestamp base-time :temperature 22.5}
original-data [{:timestamp (java-time/plus base-time (java-time/seconds 30)) :temperature 23.1}
{:timestamp (java-time/plus base-time (java-time/seconds 60)) :temperature 22.8}]
{
:timestamp :instant :temperature :float64} 5)
windowed-ds (wd/make-windowed-dataset {reduce wd/insert-to-windowed-dataset! windowed-ds original-data)
populated-wd (
;; Create a deep copy
copied-wd (wd/copy-windowed-dataset populated-wd)]
;; **Deep copy windowed dataset example:**
:original-state {:size (:current-size populated-wd)
{:position (:current-position populated-wd)}
:copied-state {:size (:current-size copied-wd)
:position (:current-position copied-wd)}
:data-identical (= (tc/rows (wd/windowed-dataset->dataset populated-wd))
(tc/rows (wd/windowed-dataset->dataset copied-wd)))})
:original-state {:size 3, :position 3},
{:copied-state {:size 3, :position 3},
:data-identical true}
add-column-by-windowed-fn
[time-series {:keys [colname windowed-fn windowed-dataset-size]}]
Add a new column to a time-series by applying a windowed function progressively.
This function simulates real-time streaming analysis on historical time-series data. For each row in the time-series (processed in timestamp order), it:
- Inserts the row into a growing windowed dataset
- Applies the windowed function to calculate a result
- Uses that result as the column value for that row
This bridges the gap between streaming windowed analysis and batch processing of existing time-series data, allowing you to see how metrics evolve over time as if the data were being processed in real-time.
Args:
time-series
- a tablecloth dataset with timestamp-ordered dataoptions
- map with keys::colname
- name of the new column to add:windowed-fn
- function that takes a WindowedDataset and returns a value:windowed-dataset-size
- size of the windowed dataset buffer (currently ignored, uses 120)
Returns: The original time-series with the new column added, where each row contains the result of applying the windowed function to all data up to that timestamp
Use Cases: - Adding progressive metrics to time-series - Creating trend analysis columns that consider historical context - Simulating real-time algorithm behavior on historical data - Generating training data with progressive features for ML models
Examples
let [time-series (tc/dataset {:timestamp [(java-time/instant)
(30))
(java-time/plus (java-time/instant) (java-time/seconds 60))
(java-time/plus (java-time/instant) (java-time/seconds 90))]
(java-time/plus (java-time/instant) (java-time/seconds :value [10.0 20.0 15.0 25.0]})
;; Define a simple moving average function
fn [windowed-ds]
moving-avg-fn (let [regular-ds (wd/windowed-dataset->dataset windowed-ds)
(:value regular-ds)]
values (when (seq values)
(/ (reduce + values) (count values)))))
(
result (wd/add-column-by-windowed-fn time-series:colname :moving-avg
{:windowed-fn moving-avg-fn
:windowed-dataset-size 10})]
:timestamp :value :moving-avg])) (tc/select-columns result [
_unnamed [4 3]:
:timestamp | :value | :moving-avg |
---|---|---|
2025-08-07T23:15:12.088482Z | 10.0 | |
2025-08-07T23:15:42.088484Z | 20.0 | 10.0 |
2025-08-07T23:16:12.088494Z | 15.0 | 15.0 |
2025-08-07T23:16:42.088496Z | 25.0 | 15.0 |
Smoothing Functions
moving-average
[windowed-dataset window-size value-colname]
Calculate simple moving average of recent data in windowed dataset.
Args:
windowed-dataset
- aWindowedDataset
window-size
- number of recent samples to averagevalue-colname
- column name containing values to be processed
Returns: Moving average of the most recent window-size samples, or nil if insufficient data
Example
let [wd (wd/make-windowed-dataset {:x :int32} 10)
(:x 800} {:x 850} {:x 820}]
data [{reduce wd/insert-to-windowed-dataset! wd data)]
populated-wd (3 :x)) (wd/moving-average populated-wd
2470/3
median-filter
[windowed-dataset window-size value-colname]
Apply median filter to the most recent data in a windowed dataset.
Args:
windowed-dataset
- aWindowedDataset
window-size
- number of recent samples to use for median calculationvalue-colname
- column name containing values to be processed
Returns: Median value of the most recent window-size samples, or nil if insufficient data
Example
let [wd (wd/make-windowed-dataset {:x :int32} 10)
(:x 800} {:x 1200} {:x 820}] ; middle value is outlier
data [{reduce wd/insert-to-windowed-dataset! wd data)]
populated-wd (3 :x)) (wd/median-filter populated-wd
820
cascaded-median-filter
[windowed-dataset value-colname]
Apply cascaded median filters (3-point then 5-point) for robust smoothing.
Args:
windowed-dataset
- aWindowedDataset
value-colname
- column name containing values to be processed
Returns: Cascaded median filtered value, or nil if insufficient data (needs 5+ samples)
Example
let [wd (wd/make-windowed-dataset {:x :int32} 10)
(:x 800} {:x 1200} {:x 820} {:x 1100} {:x 810}]
data [{reduce wd/insert-to-windowed-dataset! wd data)]
populated-wd (:x)) (wd/cascaded-median-filter populated-wd
820
exponential-moving-average
[windowed-dataset alpha value-colname]
Calculate exponential moving average of recent data in windowed dataset.
Args:
windowed-dataset
- aWindowedDataset
alpha
- smoothing factor (0 < alpha <= 1, higher = more responsive)value-colname
- column name containing values to be processed
Returns: EMA value, or nil if no data available
Example
let [wd (wd/make-windowed-dataset {:x :int32} 10)
(:x 800} {:x 850} {:x 820}]
data [{reduce wd/insert-to-windowed-dataset! wd data)]
populated-wd (0.3 :x)) (wd/exponential-moving-average populated-wd
816.5
cascaded-smoothing-filter
[windowed-dataset median-window ma-window value-colname]
Apply cascaded smoothing: median filter followed by moving average.
This combines the outlier-removal power of median filtering with the noise-reduction benefits of moving averages for comprehensive cleaning.
Args:
windowed-dataset
- aWindowedDataset
median-window
- window size for median filterma-window
- window size for moving averagevalue-colname
- column name containing values to be processed
Returns: Final smoothed value, or nil if insufficient data
Example
let [wd (wd/make-windowed-dataset {:x :int32} 15)
(;; Data with noise and outliers
:x 800} {:x 820} {:x 1500} {:x 810}
data [{:x 805} {:x 815} {:x 2000} {:x 812}
{:x 808} {:x 795}]
{reduce wd/insert-to-windowed-dataset! wd data)]
populated-wd (
;; Compare cascaded smoothing with individual methods
:median-only (wd/median-filter populated-wd 5 :x)
{:moving-avg-only (wd/moving-average populated-wd 5 :x)
:cascaded-5-3 (wd/cascaded-smoothing-filter populated-wd 5 3 :x)})
:median-only 812, :moving-avg-only 1046, :cascaded-5-3 805.0} {
source: notebooks/api_reference.clj