Skip to content

How to Partition Data in KDB.AI

This page provides details on partitioning data within the KDB.AI.

Tip: For the best experience, start by reading about KDB.AI partitioning.

KDB.AI support partitioning for tables with any of the following indexes:

  • dense Flat index
  • dense qFlat index
  • dense HNSW index
  • dense qHNSW index
  • sparse index
  • TSS index
  • any combination of the above indexes

Setup

Before starting, you must have:

Import dependencies

Import the following dependencies:

import sys
import kdbai_client as kdbai
from pprint import pprint # for pretty printing
import pandas as pd
import numpy as np

Partition a table on any metadata column

Create table

Create a table by specifying multiple metadata columns to partition:

schema = [
    {'name': 'id', 'type': 'int16'},
    {'name': 'sym', 'type': 'str'},
    {'name': 'date', 'type': 'datetime64[D]'},
    {'name': 'embeddings', 'type': 'float32s'},
    {'name': 'sparse', 'type': 'general'}
]
indexes = [
    {'name': 'flat_index', 'column': 'embeddings', 'type': 'flat', 'params': {'dims': 25}},
    {'name': 'hnsw_fast', 'column': 'embeddings', 'type': 'hnsw', 'params': {'dims': 25, 'M': 8, 'efConstruction': 8}},
    {'name': 'sparse_index', 'column': 'sparse', 'type': 'bm25', 'params': {'k': 1.25, 'b': 0.75}}
]

# Local server
session = kdbai.Session(endpoint='http://localhost:8082')

# Get the database connection. Default database name is 'default'
database = session.database('default')

table = database.create_table('example', schema=schema, indexes=indexes, partition_column='date')
schema: flip `name`type!(`id`sym`date`embeddings`sparse;`h`s`d`E`)
flatIndex: `name`column`type`params!(`flat_index;`embeddings;`flat;enlist[`dims]!enlist 25)
hnswFast: `name`column`type`params!(`hnsw_fast;`embeddings;`hnsw;`dims`M`efConstruction!(25;8;8))
sparseIndex: `name`column`type`params!(`sparse_index;`sparse;`bm25;`k`b!(1.25;0.75))
indexes: (flatIndex;hnswFast;sparseIndex)
gw: hopen 8082;
gw(`createTable;`database`table`schema`indexes`partitionColumn!(`default;`documents;schema;indexes;`date))
{
    "table": "example",
    "schema": [
        {"name": "id", "type": "short"},
        {"name": "sym", "type": "symbol"},
        {"name": "date", "type": "date"},
        {"name": "embeddings", "type": "reals"},
        {"name": "sparse", "type": "reals"},
    ],
    "indexes": [
        {"name": "flat_index", "column": "embeddings", "type": "flat", "params": {"dims": 25}},
        {"name": "hnsw_fast", "column": "embeddings", "type": "hnsw", "params": {"dims": 25, "M": 8, "efConstruction": 8}},
        {"name": "sparse_index", "column": "sparse", "type": "bm25", "params": {"k": 1.25, "b": 0.75}}
    ],
    "partitionColumn": "date"
}
curl -X POST -H "Content-Type: application/json" -d @schemaAbove.json localhost:8081/api/v2/databases/default/tables

The system creates a number of partitions, within the specified columns, by default, and should not be defined by the user.

Result: The table is created such that it is partitioned on composite values in the specified metadata columns.

Example of Python code for partition by symbol column:

table_partitioned_by_sym = database.create_table('example', schema=schema, indexes=indexes, partition_column='sym')

Insert data

Add partitions by inserting data.

row_count = 1000
df = pd.DataFrame({
    'id': list(range(row_count)),
    'sym': np.random.choice(['AAA', 'BBB', 'CCC', 'DDD'], 1000),
    'date': np.random.choice(pd.date_range(start='2021-01-01', periods=row_count/4, freq='1D'), row_count),
    'embeddings': list(np.random.rand(row_count, 25).astype(np.float32)),
    'sparse': [{np.random.randint(1, 1000): np.random.rand() for _ in range(np.random.randint(10))} for _ in range(row_count)]
    })
table.insert(df)
id: 21212 21212
sym: aaa bbb
date: 2023.10.11T00:00:00.000000000 2023.10.11T00:00:00.000000000
embeddings: (25?1e; 25?1e)
sparse: ((1996 101 11190 5598 2058 5231 102!2 1 1 1 1 1 1);(1996 11190 2058 4231 102! 1 1 1 1 1))

data: `id`sym`date`embeddings`sparse!(id sym date embeddings sparse)
gw: hopen 8082;
gw(`insertData;`database`table`payload!(`default;`example;data))
{
  "rows": [
    {
      "id": 21212,
      "sym": "aaa",
      "date": "2023-10-11T00:00:00.000000000",
      "embeddings": [0.439081, 0.5759051, 0.5919004, 0.8481566, 0.389056, 0.391543, 0.08123546, 0.9367504, 0.2782122, 0.2392341, 0.1508133, 0.1567317, 0.9785, 0.7043314, 0.9441671, 0.7833686, 0.4099561, 0.6108817, 0.4976492, 0.4087545, 0.449731, 0.01392076, 0.714878, 0.1946509, 0.09059025],
      "sparse": {"1996": 2, "101": 1, "11190": 1, "5598": 1, "2058": 1, "4231": 1, "102": 1}
    },
    {
      "id": 19376,
      "sym": "bbb",
      "date": "2023-10-11T00:00:00.000000000",
      "embeddings": [0.6203014, 0.9326316, 0.2747066, 0.05752515, 0.2560658, 0.2310108, 0.08724017, 0.1024432, 0.8671097, 0.7278528, 0.1627662, 0.6884756, 0.8177547, 0.7520102, 0.1086824, 0.9598964, 0.03668341, 0.6430982, 0.6708738, 0.6789082, 0.412317, 0.9877844, 0.3867353, 0.726781, 0.4046546],
      "sparse": {"1996": 1, "5598": 1, "2058": 1, "4231": 1, "102": 1}
    }
  ]
}
curl -H 'Content-Type: application/json' -d @insert.json localhost:8081/api/v2/databases/default/tables/example/insert

Query partitioned data

You can perform searches and queries using filters on the partition column. This capability ensures that queries are executed efficiently by limiting the scope of data scanned to the relevant partitions. For instance, use this query to count the rows on each partition:

table.query(aggs={'cnt': ['count', 'id']}, group_by=['date'])
gw(`query;`database`table`aggs`groupBy!(`default;`example;enlist[`cnt]!enlist[`count`id];enlist[`date]))
curl -s -H "Content-Type: application/json" localhost:8081/api/v2/databases/default/tables/example/query \
-d '{"aggs":{"cnt":["count","id"]}, "groupBy":["date"]}'

Best practices

  • Consistent Partitioning Strategy: Maintain a consistent approach to partitioning to simplify data management and ensure optimal performance.

  • Monitor Performance: Consistently track query performance and modify partitioning strategies as necessary.

  • Data Archiving: Implement archiving strategies for older partitions to manage storage effectively and keep the system performant.