Windowed Dataset Examples
ns windowed-dataset-examples
("Examples demonstrating windowed dataset functionality for streaming data analysis."
:require [scicloj.windowed-dataset.api :as wd]
(:as tc]
[tablecloth.api :as tcc]
[tablecloth.column.api :as java-time]
[java-time.api :as kind]
[scicloj.kindly.v4.kind :as dfn])) [tech.v3.datatype.functional
This notebook shows how to use windowed datasets for streaming data analysis. We’ll work with temperature sensor data as a relatable example, but the same patterns apply to any time-series data.
The Problem: Streaming Data with Limited Memory
Imagine you’re processing temperature readings from sensors. New readings arrive continuously, but you only care about recent trends and don’t want to store years of historical data in memory.
Windowed datasets solve this by maintaining a “sliding window” of recent data.
Basic Setup
Let’s define our data structure: timestamp, temperature, and sensor location
def column-types
(:timestamp :instant
{:temperature :float64
:location :string})
Create a windowed dataset that keeps only the last 5 readings
def temperature-buffer
(5)) (wd/make-windowed-dataset column-types
Initial state - empty buffer:
temperature-buffer
{
|
_unnamed [5 3]:
|
:column-types {:timestamp :instant, :temperature :float64, :location :string}
:max-size 5
:current-size 0
:current-position 0
}
Simulating Streaming Data
Create sample temperature readings (like data from a greenhouse monitoring system)
def sample-readings
(let [start-time (java-time/instant)]
(map (fn [i]
(:timestamp (java-time/plus start-time (java-time/seconds (* i 30)))
{:temperature (+ 22.0 (* 2.0 (Math/sin (/ i 3.0)))) ; Simulated daily variation
:location "greenhouse-a"})
range 10)))) (
Sample data - first 5 readings:
take 5 sample-readings)) (tc/dataset (
_unnamed [5 3]:
:timestamp | :temperature | :location |
---|---|---|
2025-08-07T23:15:11.770708Z | 22.00000000 | greenhouse-a |
2025-08-07T23:15:41.770708Z | 22.65438939 | greenhouse-a |
2025-08-07T23:16:11.770708Z | 23.23673961 | greenhouse-a |
2025-08-07T23:16:41.770708Z | 23.68294197 | greenhouse-a |
2025-08-07T23:17:11.770708Z | 23.94387580 | greenhouse-a |
Watching the Window in Action
Let’s insert data one by one and watch how the window behaves
defn show-window-evolution []
(let [results (atom [])]
(reduce (fn [buffer reading]
(let [updated-buffer (wd/insert-to-windowed-dataset! buffer reading)
(
current-data (wd/windowed-dataset->dataset updated-buffer)]swap! results conj
(:reading-number (count @results)
{:buffer-size (:current-size updated-buffer)
:temperatures (vec (:temperature current-data))})
updated-buffer))
temperature-buffertake 8 sample-readings))
(@results))
Window evolution as data arrives:
(tc/dataset (show-window-evolution))
_unnamed [8 3]:
:reading-number | :buffer-size | :temperatures |
---|---|---|
0 | 1 | [22.0] |
1 | 2 | [22.0 22.654389393592304] |
2 | 3 | [22.0 22.654389393592304 23.236739606139473] |
3 | 4 | [22.0 22.654389393592304 23.236739606139473 23.682941969615793] |
4 | 5 | [22.0 |
22.654389393592304 | ||
23.236739606139473 | ||
23.682941969615793 | ||
23.943875802726627] | ||
5 | 5 | [22.654389393592304 |
23.236739606139473 | ||
23.682941969615793 | ||
23.943875802726627 | ||
23.99081591550353] | ||
6 | 5 | [23.236739606139473 |
23.682941969615793 | ||
23.943875802726627 | ||
23.99081591550353 | ||
23.818594853651362] | ||
7 | 5 | [23.682941969615793 |
23.943875802726627 | ||
23.99081591550353 | ||
23.818594853651362 | ||
23.44617176347665] |
Notice how:
- The buffer grows until it reaches maximum size (5)
- After that, old data is automatically discarded to make room for new data
- Memory usage stays constant regardless of how much data we process
Time-Based Windows
Often you don’t just want the last N readings - you want “all data from the last X minutes”
Fill our buffer with all sample data
def full-buffer
(reduce wd/insert-to-windowed-dataset! temperature-buffer sample-readings)) (
All data in buffer:
(wd/windowed-dataset->dataset full-buffer)
_unnamed [5 3]:
:timestamp | :temperature | :location |
---|---|---|
2025-08-07T23:17:41.770708976Z | 23.99081592 | greenhouse-a |
2025-08-07T23:18:11.770708976Z | 23.81859485 | greenhouse-a |
2025-08-07T23:18:41.770708976Z | 23.44617176 | greenhouse-a |
2025-08-07T23:19:11.770708976Z | 22.91454525 | greenhouse-a |
2025-08-07T23:19:41.770708976Z | 22.28224002 | greenhouse-a |
Extract different time windows
let [now (:timestamp (last sample-readings))]
(:last-90-seconds (wd/windowed-dataset->time-window-dataset full-buffer :timestamp 90000)
{:last-150-seconds (wd/windowed-dataset->time-window-dataset full-buffer :timestamp 150000)
:reference-time now})
{
|
_unnamed [4 3]:
|
|
_unnamed [5 3]:
|
:reference-time #inst "2025-08-07T23:19:41.770708976-00:00"
}
Real-World Pattern: Streaming Analytics
Here’s how you might calculate a running average temperature in a real application
defn calculate-average-temperature [buffer]
("Calculate current average temperature from buffer"
let [current-data (wd/windowed-dataset->dataset buffer)
(:temperature current-data)]
temperatures (when (seq temperatures)
( (dfn/mean temperatures))))
Simulate processing readings as they arrive, calculating moving averages
defn streaming-analytics-demo []
(let [results (atom [])]
(reduce (fn [buffer reading]
(let [updated-buffer (wd/insert-to-windowed-dataset! buffer reading)
(
avg-temp (calculate-average-temperature updated-buffer)]swap! results conj
(:timestamp (:timestamp reading)
{:current-temperature (:temperature reading)
:running-average avg-temp
:readings-in-buffer (:current-size updated-buffer)})
updated-buffer))
temperature-buffer
sample-readings)@results))
Streaming analytics results:
(tc/dataset (streaming-analytics-demo))
_unnamed [10 4]:
:timestamp | :current-temperature | :running-average | :readings-in-buffer |
---|---|---|---|
2025-08-07T23:15:11.770708Z | 22.00000000 | 22.00000000 | 1 |
2025-08-07T23:15:41.770708Z | 22.65438939 | 22.32719470 | 2 |
2025-08-07T23:16:11.770708Z | 23.23673961 | 22.63037633 | 3 |
2025-08-07T23:16:41.770708Z | 23.68294197 | 22.89351774 | 4 |
2025-08-07T23:17:11.770708Z | 23.94387580 | 23.10358935 | 5 |
2025-08-07T23:17:41.770708Z | 23.99081592 | 23.50175254 | 5 |
2025-08-07T23:18:11.770708Z | 23.81859485 | 23.73459363 | 5 |
2025-08-07T23:18:41.770708Z | 23.44617176 | 23.77648006 | 5 |
2025-08-07T23:19:11.770708Z | 22.91454525 | 23.62280072 | 5 |
2025-08-07T23:19:41.770708Z | 22.28224002 | 23.29047356 | 5 |
Historical Analysis: Adding Progressive Features
Sometimes you have historical data and want to see how metrics would have evolved over time, as if you were processing it in real-time.
def historical-temperatures
(:timestamp (map #(java-time/plus (java-time/instant)
(tc/dataset {%))
(java-time/seconds range 12))
(:temperature (map #(+ 20.0 (* 3.0 (Math/sin (/ % 2.0))))
range 12))})) (
Original historical data:
historical-temperatures
_unnamed [12 2]:
:timestamp | :temperature |
---|---|
2025-08-07T23:15:11.797181Z | 20.00000000 |
2025-08-07T23:15:12.797228Z | 21.43827662 |
2025-08-07T23:15:13.797230Z | 22.52441295 |
2025-08-07T23:15:14.797232Z | 22.99248496 |
2025-08-07T23:15:15.797233Z | 22.72789228 |
2025-08-07T23:15:16.797234Z | 21.79541643 |
2025-08-07T23:15:17.797236Z | 20.42336002 |
2025-08-07T23:15:18.797237Z | 18.94765032 |
2025-08-07T23:15:19.797239Z | 17.72959251 |
2025-08-07T23:15:20.797241Z | 17.06740965 |
2025-08-07T23:15:21.797242Z | 17.12322718 |
2025-08-07T23:15:22.797243Z | 17.88337902 |
Add a progressive moving average column
def with-moving-average
(
(wd/add-column-by-windowed-fn
historical-temperatures:colname :progressive-average
{:windowed-fn calculate-average-temperature
:windowed-dataset-size 120}))
Historical data with progressive moving averages:
with-moving-average
_unnamed [12 3]:
:timestamp | :temperature | :progressive-average |
---|---|---|
2025-08-07T23:15:11.797181Z | 20.00000000 | |
2025-08-07T23:15:12.797228Z | 21.43827662 | 20.00000000 |
2025-08-07T23:15:13.797230Z | 22.52441295 | 20.71913831 |
2025-08-07T23:15:14.797232Z | 22.99248496 | 21.32089652 |
2025-08-07T23:15:15.797233Z | 22.72789228 | 21.73879363 |
2025-08-07T23:15:16.797234Z | 21.79541643 | 21.93661336 |
2025-08-07T23:15:17.797236Z | 20.42336002 | 21.91308054 |
2025-08-07T23:15:18.797237Z | 18.94765032 | 21.70026332 |
2025-08-07T23:15:19.797239Z | 17.72959251 | 21.35618670 |
2025-08-07T23:15:20.797241Z | 17.06740965 | 20.95323179 |
2025-08-07T23:15:21.797242Z | 17.12322718 | 20.56464957 |
2025-08-07T23:15:22.797243Z | 17.88337902 | 20.25179299 |
Notice how the moving average starts as nil (no data), then becomes the first value, then a true average as more data accumulates.
Memory Efficiency Demo
Let’s prove that windowed datasets use constant memory regardless of input size
defn memory-efficiency-test []
(let [;; Create lots of data
(map (fn [i]
lots-of-data (:timestamp (java-time/plus (java-time/instant)
{
(java-time/millis i)):temperature (+ 20.0 (rand 10.0))
:location "test-sensor"})
range 1000))
(
;; Create two buffers of different sizes
10)
small-buffer (wd/make-windowed-dataset column-types 100)]
large-buffer (wd/make-windowed-dataset column-types
;; Process all data through both buffers
let [final-small (reduce wd/insert-to-windowed-dataset! small-buffer lots-of-data)
(reduce wd/insert-to-windowed-dataset! large-buffer lots-of-data)]
final-large (
:input-data-size (count lots-of-data)
{:small-buffer-final-size (:current-size final-small)
:large-buffer-final-size (:current-size final-large)
:memory-usage "Constant - only depends on buffer size, not input size!"})))
Memory efficiency test:
(memory-efficiency-test)
:input-data-size 1000,
{:small-buffer-final-size 10,
:large-buffer-final-size 100,
:memory-usage
"Constant - only depends on buffer size, not input size!"}
Practical Applications
Windowed datasets are perfect for:
- Real-time monitoring - Track recent system metrics, sensor readings, or user activity
- Streaming alerts - Detect anomalies based on recent patterns without storing everything
- Live dashboards - Show current trends and recent history with bounded memory
- IoT data processing - Handle continuous sensor streams efficiently
- Financial analysis - Calculate technical indicators on streaming market data
- ML feature engineering - Create time-based features for online learning models
Key Benefits:
- Predictable memory usage - Never grows beyond your specified window size
- Efficient time queries - Binary search makes time-based filtering fast
- Streaming-friendly - Designed for continuous data processing
- Seamless integration - Works naturally with existing Clojure data tools
Summary
Windowed datasets provide a simple but powerful abstraction for handling streaming time-series data. By maintaining only recent data, you can build efficient real-time analytics systems that don’t consume unbounded memory.
The key insight is that many analyses only need recent context, not complete history. Windowed datasets make this pattern explicit and efficient.