Windowed Dataset Examples

(ns windowed-dataset-examples
  "Examples demonstrating windowed dataset functionality for streaming data analysis."
  (:require [scicloj.windowed-dataset.api :as wd]
            [tablecloth.api :as tc]
            [tablecloth.column.api :as tcc]
            [java-time.api :as java-time]
            [scicloj.kindly.v4.kind :as kind]
            [tech.v3.datatype.functional :as dfn]))

This notebook shows how to use windowed datasets for streaming data analysis. We’ll work with temperature sensor data as a relatable example, but the same patterns apply to any time-series data.

The Problem: Streaming Data with Limited Memory

Imagine you’re processing temperature readings from sensors. New readings arrive continuously, but you only care about recent trends and don’t want to store years of historical data in memory.

Windowed datasets solve this by maintaining a “sliding window” of recent data.

Basic Setup

Let’s define our data structure: timestamp, temperature, and sensor location

(def column-types
  {:timestamp :instant
   :temperature :float64
   :location :string})

Create a windowed dataset that keeps only the last 5 readings

(def temperature-buffer
  (wd/make-windowed-dataset column-types 5))

Initial state - empty buffer:

temperature-buffer

{

:dataset

_unnamed [5 3]:

:timestamp :temperature :location
0.0
0.0
0.0
0.0
0.0
:column-types {:timestamp :instant, :temperature :float64, :location :string}
:max-size 5
:current-size 0
:current-position 0

}

Simulating Streaming Data

Create sample temperature readings (like data from a greenhouse monitoring system)

(def sample-readings
  (let [start-time (java-time/instant)]
    (map (fn [i]
           {:timestamp (java-time/plus start-time (java-time/seconds (* i 30)))
            :temperature (+ 22.0 (* 2.0 (Math/sin (/ i 3.0)))) ; Simulated daily variation
            :location "greenhouse-a"})
         (range 10))))

Sample data - first 5 readings:

(tc/dataset (take 5 sample-readings))

_unnamed [5 3]:

:timestamp :temperature :location
2025-08-07T23:15:11.770708Z 22.00000000 greenhouse-a
2025-08-07T23:15:41.770708Z 22.65438939 greenhouse-a
2025-08-07T23:16:11.770708Z 23.23673961 greenhouse-a
2025-08-07T23:16:41.770708Z 23.68294197 greenhouse-a
2025-08-07T23:17:11.770708Z 23.94387580 greenhouse-a

Watching the Window in Action

Let’s insert data one by one and watch how the window behaves

(defn show-window-evolution []
  (let [results (atom [])]
    (reduce (fn [buffer reading]
              (let [updated-buffer (wd/insert-to-windowed-dataset! buffer reading)
                    current-data (wd/windowed-dataset->dataset updated-buffer)]
                (swap! results conj
                       {:reading-number (count @results)
                        :buffer-size (:current-size updated-buffer)
                        :temperatures (vec (:temperature current-data))})
                updated-buffer))
            temperature-buffer
            (take 8 sample-readings))
    @results))

Window evolution as data arrives:

(tc/dataset (show-window-evolution))

_unnamed [8 3]:

:reading-number :buffer-size :temperatures
0 1 [22.0]
1 2 [22.0 22.654389393592304]
2 3 [22.0 22.654389393592304 23.236739606139473]
3 4 [22.0 22.654389393592304 23.236739606139473 23.682941969615793]
4 5 [22.0
22.654389393592304
23.236739606139473
23.682941969615793
23.943875802726627]
5 5 [22.654389393592304
23.236739606139473
23.682941969615793
23.943875802726627
23.99081591550353]
6 5 [23.236739606139473
23.682941969615793
23.943875802726627
23.99081591550353
23.818594853651362]
7 5 [23.682941969615793
23.943875802726627
23.99081591550353
23.818594853651362
23.44617176347665]

Notice how:

  • The buffer grows until it reaches maximum size (5)
  • After that, old data is automatically discarded to make room for new data
  • Memory usage stays constant regardless of how much data we process

Time-Based Windows

Often you don’t just want the last N readings - you want “all data from the last X minutes”

Fill our buffer with all sample data

(def full-buffer
  (reduce wd/insert-to-windowed-dataset! temperature-buffer sample-readings))

All data in buffer:

(wd/windowed-dataset->dataset full-buffer)

_unnamed [5 3]:

:timestamp :temperature :location
2025-08-07T23:17:41.770708976Z 23.99081592 greenhouse-a
2025-08-07T23:18:11.770708976Z 23.81859485 greenhouse-a
2025-08-07T23:18:41.770708976Z 23.44617176 greenhouse-a
2025-08-07T23:19:11.770708976Z 22.91454525 greenhouse-a
2025-08-07T23:19:41.770708976Z 22.28224002 greenhouse-a

Extract different time windows

(let [now (:timestamp (last sample-readings))]
  {:last-90-seconds (wd/windowed-dataset->time-window-dataset full-buffer :timestamp 90000)
   :last-150-seconds (wd/windowed-dataset->time-window-dataset full-buffer :timestamp 150000)
   :reference-time now})

{

:last-90-seconds

_unnamed [4 3]:

:timestamp :temperature :location
2025-08-07T23:18:11.770708976Z 23.81859485 greenhouse-a
2025-08-07T23:18:41.770708976Z 23.44617176 greenhouse-a
2025-08-07T23:19:11.770708976Z 22.91454525 greenhouse-a
2025-08-07T23:19:41.770708976Z 22.28224002 greenhouse-a
:last-150-seconds

_unnamed [5 3]:

:timestamp :temperature :location
2025-08-07T23:17:41.770708976Z 23.99081592 greenhouse-a
2025-08-07T23:18:11.770708976Z 23.81859485 greenhouse-a
2025-08-07T23:18:41.770708976Z 23.44617176 greenhouse-a
2025-08-07T23:19:11.770708976Z 22.91454525 greenhouse-a
2025-08-07T23:19:41.770708976Z 22.28224002 greenhouse-a
:reference-time #inst "2025-08-07T23:19:41.770708976-00:00"

}

Real-World Pattern: Streaming Analytics

Here’s how you might calculate a running average temperature in a real application

(defn calculate-average-temperature [buffer]
  "Calculate current average temperature from buffer"
  (let [current-data (wd/windowed-dataset->dataset buffer)
        temperatures (:temperature current-data)]
    (when (seq temperatures)
      (dfn/mean temperatures))))

Simulate processing readings as they arrive, calculating moving averages

(defn streaming-analytics-demo []
  (let [results (atom [])]
    (reduce (fn [buffer reading]
              (let [updated-buffer (wd/insert-to-windowed-dataset! buffer reading)
                    avg-temp (calculate-average-temperature updated-buffer)]
                (swap! results conj
                       {:timestamp (:timestamp reading)
                        :current-temperature (:temperature reading)
                        :running-average avg-temp
                        :readings-in-buffer (:current-size updated-buffer)})
                updated-buffer))
            temperature-buffer
            sample-readings)
    @results))

Streaming analytics results:

(tc/dataset (streaming-analytics-demo))

_unnamed [10 4]:

:timestamp :current-temperature :running-average :readings-in-buffer
2025-08-07T23:15:11.770708Z 22.00000000 22.00000000 1
2025-08-07T23:15:41.770708Z 22.65438939 22.32719470 2
2025-08-07T23:16:11.770708Z 23.23673961 22.63037633 3
2025-08-07T23:16:41.770708Z 23.68294197 22.89351774 4
2025-08-07T23:17:11.770708Z 23.94387580 23.10358935 5
2025-08-07T23:17:41.770708Z 23.99081592 23.50175254 5
2025-08-07T23:18:11.770708Z 23.81859485 23.73459363 5
2025-08-07T23:18:41.770708Z 23.44617176 23.77648006 5
2025-08-07T23:19:11.770708Z 22.91454525 23.62280072 5
2025-08-07T23:19:41.770708Z 22.28224002 23.29047356 5

Historical Analysis: Adding Progressive Features

Sometimes you have historical data and want to see how metrics would have evolved over time, as if you were processing it in real-time.

(def historical-temperatures
  (tc/dataset {:timestamp (map #(java-time/plus (java-time/instant)
                                                (java-time/seconds %))
                               (range 12))
               :temperature (map #(+ 20.0 (* 3.0 (Math/sin (/ % 2.0))))
                                 (range 12))}))

Original historical data:

historical-temperatures

_unnamed [12 2]:

:timestamp :temperature
2025-08-07T23:15:11.797181Z 20.00000000
2025-08-07T23:15:12.797228Z 21.43827662
2025-08-07T23:15:13.797230Z 22.52441295
2025-08-07T23:15:14.797232Z 22.99248496
2025-08-07T23:15:15.797233Z 22.72789228
2025-08-07T23:15:16.797234Z 21.79541643
2025-08-07T23:15:17.797236Z 20.42336002
2025-08-07T23:15:18.797237Z 18.94765032
2025-08-07T23:15:19.797239Z 17.72959251
2025-08-07T23:15:20.797241Z 17.06740965
2025-08-07T23:15:21.797242Z 17.12322718
2025-08-07T23:15:22.797243Z 17.88337902

Add a progressive moving average column

(def with-moving-average
  (wd/add-column-by-windowed-fn
   historical-temperatures
   {:colname :progressive-average
    :windowed-fn calculate-average-temperature
    :windowed-dataset-size 120}))

Historical data with progressive moving averages:

with-moving-average

_unnamed [12 3]:

:timestamp :temperature :progressive-average
2025-08-07T23:15:11.797181Z 20.00000000
2025-08-07T23:15:12.797228Z 21.43827662 20.00000000
2025-08-07T23:15:13.797230Z 22.52441295 20.71913831
2025-08-07T23:15:14.797232Z 22.99248496 21.32089652
2025-08-07T23:15:15.797233Z 22.72789228 21.73879363
2025-08-07T23:15:16.797234Z 21.79541643 21.93661336
2025-08-07T23:15:17.797236Z 20.42336002 21.91308054
2025-08-07T23:15:18.797237Z 18.94765032 21.70026332
2025-08-07T23:15:19.797239Z 17.72959251 21.35618670
2025-08-07T23:15:20.797241Z 17.06740965 20.95323179
2025-08-07T23:15:21.797242Z 17.12322718 20.56464957
2025-08-07T23:15:22.797243Z 17.88337902 20.25179299

Notice how the moving average starts as nil (no data), then becomes the first value, then a true average as more data accumulates.

Memory Efficiency Demo

Let’s prove that windowed datasets use constant memory regardless of input size

(defn memory-efficiency-test []
  (let [;; Create lots of data
        lots-of-data (map (fn [i]
                            {:timestamp (java-time/plus (java-time/instant)
                                                        (java-time/millis i))
                             :temperature (+ 20.0 (rand 10.0))
                             :location "test-sensor"})
                          (range 1000))

        ;; Create two buffers of different sizes
        small-buffer (wd/make-windowed-dataset column-types 10)
        large-buffer (wd/make-windowed-dataset column-types 100)]

    ;; Process all data through both buffers
    (let [final-small (reduce wd/insert-to-windowed-dataset! small-buffer lots-of-data)
          final-large (reduce wd/insert-to-windowed-dataset! large-buffer lots-of-data)]

      {:input-data-size (count lots-of-data)
       :small-buffer-final-size (:current-size final-small)
       :large-buffer-final-size (:current-size final-large)
       :memory-usage "Constant - only depends on buffer size, not input size!"})))

Memory efficiency test:

(memory-efficiency-test)
{:input-data-size 1000,
 :small-buffer-final-size 10,
 :large-buffer-final-size 100,
 :memory-usage
 "Constant - only depends on buffer size, not input size!"}

Practical Applications

Windowed datasets are perfect for:

  1. Real-time monitoring - Track recent system metrics, sensor readings, or user activity
  2. Streaming alerts - Detect anomalies based on recent patterns without storing everything
  3. Live dashboards - Show current trends and recent history with bounded memory
  4. IoT data processing - Handle continuous sensor streams efficiently
  5. Financial analysis - Calculate technical indicators on streaming market data
  6. ML feature engineering - Create time-based features for online learning models

Key Benefits:

  • Predictable memory usage - Never grows beyond your specified window size
  • Efficient time queries - Binary search makes time-based filtering fast
  • Streaming-friendly - Designed for continuous data processing
  • Seamless integration - Works naturally with existing Clojure data tools

Summary

Windowed datasets provide a simple but powerful abstraction for handling streaming time-series data. By maintaining only recent data, you can build efficient real-time analytics systems that don’t consume unbounded memory.

The key insight is that many analyses only need recent context, not complete history. Windowed datasets make this pattern explicit and efficient.

source: notebooks/windowed_dataset_examples.clj