Skip to content
4 changes: 4 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type Index interface {
// Size of the index on disk, if applicable.
DiskSizeBytes() int64

// TagValueCacheBytes is the size of tag value cache for TSI indexes.
// This is only to be used with TSI.
TagValueCacheBytes() int64

// Bytes estimates the memory footprint of this Index, in bytes.
Bytes() int

Expand Down
4 changes: 4 additions & 0 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,10 @@ func (i *Index) SeriesIDIterator(opt query.IteratorOptions) (tsdb.SeriesIDIterat
// DiskSizeBytes always returns zero bytes, since this is an in-memory index.
func (i *Index) DiskSizeBytes() int64 { return 0 }

// TagValueCacheBytes always returns zero bytes, since the in-memory index
// does not use a tag value series ID cache.
func (i *Index) TagValueCacheBytes() int64 { return 0 }

// Rebuild recreates the measurement indexes to allow deleted series to be removed
// and garbage collected.
func (i *Index) Rebuild() {
Expand Down
24 changes: 24 additions & 0 deletions tsdb/index/tsi1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsi1
import (
"container/list"
"sync"
"unsafe"

"github.com/influxdata/influxdb/tsdb"
)
Expand Down Expand Up @@ -191,6 +192,29 @@ func (c *TagValueSeriesIDCache) checkEviction() {
}
}

// HeapSize estimates the total heap memory usage of the cache in bytes.
func (c *TagValueSeriesIDCache) HeapSize() int {
c.RLock()
defer c.RUnlock()

size := int(unsafe.Sizeof(*c))
for name, mmap := range c.cache {
size += len(name) + int(unsafe.Sizeof(mmap))
for key, tkmap := range mmap {
size += len(key) + int(unsafe.Sizeof(tkmap))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For strings like key, is len(key) the size used, or is it unsafe.Sizeof(key) + len(key)

for value, ele := range tkmap {
size += len(value) + int(unsafe.Sizeof(ele))
elem := ele.Value.(*seriesIDCacheElement)
size += len(elem.name) + len(elem.key) + len(elem.value)
if elem.SeriesIDSet != nil {
size += elem.SeriesIDSet.Bytes()
}
}
}
}
return size
}

// seriesIDCacheElement is an item stored within a cache.
type seriesIDCacheElement struct {
name string
Expand Down
48 changes: 48 additions & 0 deletions tsdb/index/tsi1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/influxdata/influxdb/tsdb"
"github.com/stretchr/testify/require"
)

// This function is used to log the components of disk size when DiskSizeBytes fails
Expand Down Expand Up @@ -150,6 +151,53 @@ func TestTagValueSeriesIDCache_eviction(t *testing.T) {
cache.Has(t, "m3", "k0", "v0", m3k0v0)
}

// TestTagValueSeriesIDCache_HeapSize verifies that HeapSize tracks memory growth
// as entries are added, shrinks after eviction, and handles nil SeriesIDSets.
func TestTagValueSeriesIDCache_HeapSize(t *testing.T) {
cache := TestCache{NewTagValueSeriesIDCache(10)}

emptySize := cache.HeapSize()
require.Positive(t, emptySize)

cache.PutByString("m0", "k0", "v0", tsdb.NewSeriesIDSet(1, 2, 3))
sizeAfterOne := cache.HeapSize()
require.Greater(t, sizeAfterOne, emptySize)

cache.PutByString("m0", "k0", "v1", tsdb.NewSeriesIDSet(100, 200, 300, 400, 500))
sizeAfterTwo := cache.HeapSize()
require.Greater(t, sizeAfterTwo, sizeAfterOne)

cache.PutByString("m1", "k0", "v0", tsdb.NewSeriesIDSet(1000))
sizeAfterThree := cache.HeapSize()
require.Greater(t, sizeAfterThree, sizeAfterTwo)
}

// TestTagValueSeriesIDCache_HeapSize_eviction verifies that evicting a large
// entry and replacing it with a small one reduces the reported HeapSize.
func TestTagValueSeriesIDCache_HeapSize_eviction(t *testing.T) {
cache := TestCache{NewTagValueSeriesIDCache(2)}

large := make([]uint64, 1000)
for i := range large {
large[i] = uint64(i)
}
cache.PutByString("m0", "k0", "v0", tsdb.NewSeriesIDSet(large...))
cache.PutByString("m0", "k0", "v1", tsdb.NewSeriesIDSet(large...))
fullSize := cache.HeapSize()

cache.PutByString("m0", "k0", "v2", tsdb.NewSeriesIDSet(1))
require.Less(t, cache.HeapSize(), fullSize)
}

// TestTagValueSeriesIDCache_HeapSize_nil_set verifies that a nil SeriesIDSet
// entry is counted without panicking.
func TestTagValueSeriesIDCache_HeapSize_nil_set(t *testing.T) {
cache := TestCache{NewTagValueSeriesIDCache(10)}

cache.PutByString("m0", "k0", "v0", nil)
require.Positive(t, cache.HeapSize())
}

func TestTagValueSeriesIDCache_addToSet(t *testing.T) {
cache := TestCache{NewTagValueSeriesIDCache(4)}
cache.PutByString("m0", "k0", "v0", nil) // Puts a nil set in the cache.
Expand Down
51 changes: 51 additions & 0 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ type Index struct {

// Number of partitions used by the index.
PartitionN uint64

// Number of bytes cache is currently using.
// Updated periodically by CollectTagValueCacheMetrics goroutine.
cacheBytes int64

// Closed when the index is closing, to signal background goroutines to stop.
closing chan struct{}
}

func (i *Index) UniqueReferenceID() uintptr {
Expand All @@ -180,6 +187,7 @@ func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *
sSketch: hll.NewDefaultPlus(),
sTSketch: hll.NewDefaultPlus(),
PartitionN: DefaultPartitionN,
closing: make(chan struct{}),
}

for _, option := range options {
Expand Down Expand Up @@ -260,6 +268,9 @@ func (i *Index) Open() (rErr error) {
return errors.New("index already open")
}

// Re-initialize closing channel for reopen support.
i.closing = make(chan struct{})

// Ensure root exists.
if err := os.MkdirAll(i.path, 0777); err != nil {
return err
Expand Down Expand Up @@ -303,6 +314,10 @@ func (i *Index) Open() (rErr error) {
// Mark opened.
i.opened = true
i.logger.Info(fmt.Sprintf("index opened with %d partitions", partitionN))

// Start background goroutine to periodically collect cache metrics.
i.collectTagValueCacheMetrics()

return nil
}

Expand Down Expand Up @@ -350,6 +365,14 @@ func (i *Index) Close() error {

// close closes the index without locking
func (i *Index) close() (rErr error) {
// Signal background goroutines to stop.
select {
case <-i.closing:
// Already closed.
default:
close(i.closing)
}

for _, p := range i.partitions {
if (p != nil) && p.IsOpen() {
if pErr := p.Close(); pErr != nil {
Expand Down Expand Up @@ -1062,6 +1085,34 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator,
return tsdb.MergeSeriesIDIterators(a...), nil
}

// TagValueCacheBytes returns the most recently sampled heap size of the
// tag value series ID cache, in bytes.
func (i *Index) TagValueCacheBytes() int64 {
return atomic.LoadInt64(&i.cacheBytes)
}

// collectTagValueCacheMetrics starts a background goroutine that periodically
// samples the tag value cache heap size. It exits when the index is closed.
func (i *Index) collectTagValueCacheMetrics() {
// take an initial sample
atomic.StoreInt64(&i.cacheBytes, int64(i.tagValueCache.HeapSize()))

const cacheTrigger = 10 * time.Second
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 second updates should be fine 🤔

Copy link
Contributor

@davidby-influx davidby-influx Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why collect on a schedule other than Monitor? Run the code when there is a call to Shard.Statistics like we do for SeriesN. This lets the Monitor service control frequency of collection, and reduces complexity on start and stop.

closing := i.closing
go func(closing <-chan struct{}) {
ticker := time.NewTicker(cacheTrigger)
defer ticker.Stop()
for {
select {
case <-closing:
return
case <-ticker.C:
atomic.StoreInt64(&i.cacheBytes, int64(i.tagValueCache.HeapSize()))
}
}
}(closing)
}

// TagValueSeriesIDIterator returns a series iterator for a single tag value.
func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
// Check series ID set cache...
Expand Down
28 changes: 23 additions & 5 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ var (

// Statistics gathered by the store.
const (
statDatabaseSeries = "numSeries" // number of series in a database
statDatabaseMeasurements = "numMeasurements" // number of measurements in a database
statPointsWritten = "pointsWritten" // number of points parsed by engines successfully
statValuesWritten = "valuesWritten" // number of values parsed by engines successfully
statSeriesCreated = "seriesCreated" // number of series created since startup
statDatabaseSeries = "numSeries" // number of series in a database
statDatabaseMeasurements = "numMeasurements" // number of measurements in a database
statTagValueCacheBytes = "tagValueCacheBytes" // bytes used by the tag value series ID cache
statPointsWritten = "pointsWritten" // number of points parsed by engines successfully
statValuesWritten = "valuesWritten" // number of values parsed by engines successfully
statSeriesCreated = "seriesCreated" // number of series created since startup
)

// SeriesFileDirectory is the name of the directory containing series files for
Expand Down Expand Up @@ -200,6 +201,22 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
s.mu.RLock()
shards := s.shardsSlice()
s.mu.RUnlock()

// Collect tag value cache bytes from unique indexes, grouped by database.
dbCacheBytes := make(map[string]int64)
seenIndexes := make(map[uintptr]bool)
for _, sh := range shards {
idx, err := sh.Index()
if err != nil || idx == nil {
continue
}
id := idx.UniqueReferenceID()
if !seenIndexes[id] {
seenIndexes[id] = true
dbCacheBytes[sh.Database()] += idx.TagValueCacheBytes()
}
}

// Add all the series and measurements cardinality estimations.
databases := s.Databases()
statistics := make([]models.Statistic, 0, len(databases))
Expand All @@ -223,6 +240,7 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
Values: map[string]interface{}{
statDatabaseSeries: sc,
statDatabaseMeasurements: mc,
statTagValueCacheBytes: dbCacheBytes[database],
},
})
}
Expand Down