Skip to content

kxi.sp.stats

describe

@StatsOperator
def describe(fields: Union[str, List[str]],
             stats: Union[str, tuple, List]) -> StatsOperator

Computes descriptive statistics on batches

Arguments:

  • fields - A list of column names on which to compute the statistics
  • stats - A list of statistics that should be computed

Returns:

A pipeline comprised of a 'describe' operator, which can be joined to other pipelines.

A list of all supported statistic options can be found below:

name type description
minimum string Computes the maximum of each provided column
maximum string Computes the minimum of each provided column
range string Computes the range of each provided column
length string Counts the length of the batch provided
total string Computes the total sum of each provided column
average string Computes the average of each provided column
numDistinct string Counts the number of distinct elements in each provided column
numNull string Counts the number of null elements in each provided column
numInfinity string Counts the number of infinite elements in each provided column
median string Computes the median of each provided column
quartiles string Computes the quartiles of each provided column
frequency string Creates a frequency dictionary for each provided column
mode string Computes all modes of each provided column
sampleVar string Computes the sample variance of each provided column
sampleStd string Computes the sample standard deviation of each provided column
populationVar string Computes the population variance of each provided column
populationStd string Computes the population standard deviation of each provided column
standardError string Computes the standard error of each provided column
skew* string Computes the skewness of each provided column
percentiles tuple Computes the specified percentiles on each provided column

*calculated using the Fisher-Pearson coefficient of skewness

  • Note - some statistics do not support categorical data and will return generic null for said data

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd

>>> sp.run(sp.read.from_callback('publish')
       | sp.stats.describe('x', 'average')
       | sp.write.to_variable('out'))

>>> data = pd.DataFrame({
      'x':[5,1,4,2,3],
      'y':[100,100,200,50,50]
    })

>>> kx.q('publish', data)
average_x
---------
3

Using percentiles along with other stats

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd

>>> sp.run(sp.read.from_expr('([] x: 1 2 2 3 3 3 4 4 4 4)')
      | sp.stats.describe('x', ['mode', 'skew', ('percentiles', [0.9, 0.95, 0.99])])
      | sp.write.to_variable('out'))

>>> data = pd.DataFrame({'x': [1, 2, 2, 3, 3, 3, 4, 4, 4, 5]})

>>> kx.q('publish', data)
mode_x skew_x     percentile_0.9_x percentile_0.95_x percentile_0.99_x
----------------------------------------------------------------------
3 4    -0.1678308 4.1              4.55              4.91

ema

@StatsOperator
def ema(X: Union[str, List[str]], alpha: float,
        y: Union[str, List[str]]) -> StatsOperator

Computes a running exponential moving average

Arguments:

  • X - A single column name or list of column names on which to compute the statistics
  • alpha - The decay rate to use
  • y - A single column name or list of column names to output results to ** The number of source and destination columns must match **

Returns:

A pipeline comprised of a 'ema' operator, which can be joined to other pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
          | sp.stats.ema('x', 0.33, 'res')
          | sp.write.to_variable('out'))

>>> data = pd.DataFrame({
        'x': [1, 50, 3, 4, 5, 6]
    })

>>> kx.q('publish', data)
x  res
-----------
1  1
50 17.17
3  12.4939
4  9.690913
5  8.142912
6  7.435751

sma

@StatsOperator
def sma(X: Union[str, List[str]], window: int,
        y: Union[str, List[str]]) -> StatsOperator

Computes a running simple moving average

Arguments:

  • X - A single column name or list of column names on which to compute the statistics
  • window - The size of the window which should be used to calculate the average
  • y - A single column name or list of column names to output results to ** The number of source and destination columns must match **

Returns:

A pipeline comprised of a 'sma' operator, which can be joined to other pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.stats.sma('x', 3, 'res')
        | sp.write.to_variable('out'))

>>> data = pd.DataFrame({
        'x': [1, 50, 3, 4, 5, 6]
    })

>>> kx.q('publish', data)
x  res
-------
1  1
50 25.5
3  18
4  19
5  4
6  5

twa

@StatsOperator
def twa(X: Union[str, List[str]], times: str, window: int,
        y: Union[str, List[str]]) -> StatsOperator

Computes a running time-weighted average

Arguments:

  • X - A single column name or list of column names on which to compute the statistics
  • times - A list of times to be used for weighting
  • window - The size of the window which should be used to calculate the average
  • y - A single column name or list of column names to output results to ** The number of source and destination columns must match **

Returns:

A pipeline comprised of a 'twa' operator, which can be joined to other pipelines.

This calculates, for each data point, the arithmetic mean of a moving window including that point and the n-1 prior data points weighted by the time deltas found in times.

The incoming data must be sorted, because the average is calculated using the deltas between each timestamp. Out of order data would cause negative weight to be applied to the calculation.

>>> from kxi import sp
>>> from datetime import timedelta
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.stats.twa('x', 'time', 3, 'res')
        | sp.write.to_variable('out'))

>>> data = pd.DataFrame({
    'x': range(1,6),
    'time': [timedelta(seconds=x) for x in [0, 5, 6, 14, 17]]
    })

>>> kx.q('publish', data)
x  time                 res
--------------------------------
1  0D00:00:00.000000000 1
2  0D00:00:05.000000000 2
3  0D00:00:06.000000000 2.166667
4  0D00:00:14.000000000 3.214286
5  0D00:00:17.000000000 4.166667