Cluster setups
The purpose of this page is to help you set up your single node or multi node clusters.
You can run KX for Databricks on both single node and multi-node computes. Choose the compute and RAM requirements depending on your data/pipeline requirements.
For the best experience, read the KX for Databricks Overview and the Get started guide first.
Now let's explore examples for two types of cluster setups:
Pre-requisites
The following pre-requisites must be satisfied in order to run the below notebooks:
- Configuration of single node or multi node compute
- Latest version of PyKX must be installed
- QLIC env variable must point to a valid kc.lic file
- Load Trade, Quote & Execution data from a Databricks Unity Catalogue.
For more information refer to the Get Started documentation.
1. Single node workflow
A single node compute runs spark locally and the driver works as both master and worker, with no worker nodes. It is intended for jobs that use small amounts of data or non-distributed workloads such as single-node machine learning libraries.
Import the required Python libraries:
import pykx as kx
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
Hourly OHLC - Pythonic vs. q magic
OHLC is the open, high, low, and close price for a given period.
The example below calculates the hourly OHLC in two ways: using regular Python syntax and then the q magic.
Calculate the OHLC using regular Python syntax:
kx.q("{select Open:first price,High:max price,Low:min price,Close:last price by bucket:0D01 xbar time,sym from x}", trade).head()
This should produce the following output:
bucket sym Open High Low Close
2024.07.04D04:00:00.000000000 AEF 617f 617f 617f 617f
BKR 93.21 93.21 93.21 93.21
CLMT 2327.99 2327.99 2327.99 2327.99
CMG 780f 780f 777.6 780f
F.PRA 333.99 336.79 332.6 336.69
Calculate the OHLC using q magic:
%%q
select Open:first price,High:max price,Low:min price,Close:last price by bucket:0D01 xbar time,sym from trade
This should produce the following output:
bucket sym | Open High Low Close
------------------------------------| -------------------------------
2024.07.04D04:00:00.000000000 AEF | 617 617 617 617
2024.07.04D04:00:00.000000000 BKR | 93.21 93.21 93.21 93.21
2024.07.04D04:00:00.000000000 CLMT | 2327.99 2327.99 2327.99 2327.99
2024.07.04D04:00:00.000000000 CMG | 780 780 777.6 780
2024.07.04D04:00:00.000000000 F.PRA | 333.99 336.79 332.6 336.69
Notice how both examples produce the same result.
VWAP
VWAP - The average price a security has traded at throughout the day, based on both volume and price.
The example below calculates the volume-weighted average price (VWAP) using q magic:
%%q
select vwap:size wavg price by sym from trade
Output
sym | vwap
------| --------
AEF | 625.727
AGND | 228.8684
AUY | 251.6101
AXJL | 494.2899
AXX | 326.6949
Volatility
Volatility - A statistical measure of the dispersion of returns for a given security or market index.
The example below calculates volatility for the stock AEF
.
Define the functions used to calculate the volatility:
get_volatility_pykx = kx.q("{[bid;ask] {r:0^log[x]-log next x;sqrt ema[0.001] r*r } 0.5*ask+bid}")
def get_volatility(quote_tbl, sym) -> dict:
subset_quotes = quote_tbl.loc[
(quote_tbl["sym"] == sym)
& (quote_tbl["time"] >= timedelta(hours=9, minutes=25))
& (quote_tbl["time"] < timedelta(hours=16, minutes=0))
]
return {
"datetime": subset_quotes["time"],
"volatility": get_volatility_pykx(subset_quotes["bid"], subset_quotes["ask"])
}
Pass the quote
table and the AEF
symbol to the get_volatility
function:
get_volatility(quote, 'AEF')
Output
{'datetime': pykx.TimestampVector(pykx.q('2024.07.04D09:25:03.491000000 2024.07.04D09:25:46.166000000 2024.07.04D09:27:..')),
'volatility': pykx.FloatVector(pykx.q('0.003473635 0.003496871 0.003534634 0.003532866 0.003612577 0.003686721 0.003..'))}
Spread
Spread - The difference or gap that exists between two prices, rates, or yields.
The example below calculates the moving average over the spread for the stock AEF
.
Define the functions used to calculate the moving average over the spread:
def get_mavg_of_spread(bid, ask, mavg_size: int):
spreads = ask - bid
return np.convolve(spreads, np.ones(mavg_size), mode="same") / mavg_size
kx.q["mavg_of_spread"] = get_mavg_of_spread
def get_spread(quotes: kx.Table, sym: str) -> kx.Table:
return kx.q.qsql.select(
quotes,
columns={"datetime": "time", "spread": "mavg_of_spread[bid; ask; 1000]", "bid":"bid", "ask":"ask"},
where=[f'sym=`$"{sym}"', "bid>0", "ask>0", "time>=09:25", "time <16:00"]
)
Pass the quote
table and the AEF
symbol to the get_spread
function:
get_spread(quote,'AEF').head()
Output
datetime spread bid ask
0 2024.07.04D09:25:03.491000000 15.07707 580.67 653.61
1 2024.07.04D09:25:46.166000000 15.07735 600f 630f
2 2024.07.04D09:27:01.015000000 15.07766 588.88 625f
3 2024.07.04D09:27:10.556000000 15.07844 580.67 653.61
4 2024.07.04D09:27:10.557000000 15.07873 580.67 653.61
Slippage
Slippage - The difference between the expected price of a trade and the price at which the trade is executed.
The example below calculates the Slippage based on the midpoint between the prevailing quoted bid and ask prices.
Create the merged
table by performing an as-of-join on the execs
and quote
tables:
merged = kx.q.aj(kx.SymbolVector(["sym", "time"]), execs, quote)
Calculate the Mid,Delta and Slippage columns:
merged["mid"] = 0.5 * (merged["bid"] + merged["ask"])
merged["diff"] = np.where(
merged["side"] == "BUY", merged["mid"] - merged["price"], merged["price"] - merged["mid"]
)
merged["slippage"] = (merged["diff"] / merged["mid"]) * 10000
merged.head()
Output
date sym time price size side venue bid ask bsize asize mode ex mid diff slippage
0 2022.03.31 GE.PRA 2024.07.04D09:30:02.676000000 397.6131 12 BUY venue1 396.51 397.96 1 1 R N 397.235 -0.3781 -9.518295
1 2022.03.31 MQC 2024.07.04D09:30:02.851000000 254.8359 258 SELL venue3 254.32 254.99 2 1 R V 254.655 0.1809 7.103729
2 2022.03.31 BKR 2024.07.04D09:30:03.426000000 93.64192 154 BUY venue2 93.64 93.72 1 1 R N 93.68 0.03808 4.064902
3 2022.03.31 GE.PRA 2024.07.04D09:30:04.489000000 397.5469 12 SELL venue2 397.21 398.8 1 2 R T 398.005 -0.4581 -11.50991
4 2022.03.31 DXGE 2024.07.04D09:30:06.677000000 570.1041 60 SELL venue3 569.08 571.34 2 3 R P 570.21 -0.1059 -1.857211
2. Multi node workflow
A multi node (distributed) compute consists of one driver node and one or more worker nodes.
The driver node maintains state information of all notebooks attached to the compute, maintains the SparkContext, interprets all the commands you run from a notebook or a library on the compute, and runs the Apache Spark master that coordinates with the Spark executors. Worker nodes run the Spark executors and other services required for proper functioning compute.
Use multi-node compute for larger jobs with distributed workloads.
Before you start, make sure you meet the pre-requisites at the top of this page.
Import required libraries
Import a PySpark and Python libraries by running:
import pykx as kx
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType, lit
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, DateType, IntegerType
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
import copy as cp
Prepare the data
The below examples will make use of the full dataset while using PySpark and a subset of the data while using the Python Driver.
Create trade_sdf_all_syms
and quote_sdf_all_syms
Spark Dataframes containing trade and quote data respectively using Spark SQL:
trade_sdf_all_syms = spark.sql('SELECT TTime as time, Symbol as sym, cast(Trade_Volume AS INT) as size, Trade_Price as price FROM default.nyse_trades')
quote_sdf_all_syms = spark.sql('SELECT TTime as time, Symbol as sym, Bid_Size as bsize, Bid_Price as bid, Offer_Size as asize, Offer_Price as ask FROM default.nyse_quotes')
Filter the data to the a subset of the tickers:
SYMBOLS = ['GOOG', 'AAPL', 'FB', 'AMZN', 'MSFT']
trade_sdf_few_syms = trade_sdf_all_syms.where(F.col("Symbol").isin(SYMBOLS)).orderBy(["Symbol","TTime"])
quote_sdf_few_syms = quote_sdf_all_syms.where(F.col("Symbol").isin(SYMBOLS)).orderBy(["Symbol","TTime"])
Convert the Spark Dataframes to Pandas Dataframes:
trade_pdf = trade_sdf_few_syms.toPandas()
quote_pdf = quote_sdf_few_syms.toPandas()
Slippage markouts (Python Driver/Single Node)
Slippage - The difference between the expected price of a trade and the price at which the trade is executed.
In the below example, we will use PyKX to calculate the execution slippage markouts for a subset of the data (5 tickers) using just the Python Driver (Single node).
Define the required functions used to calculate the slippage markouts
# adjust the time column in the past/future by offset e.g 10 20 30 seconds, 1 5 10 minutes
def time_offset(trade, offset):
offset_trade = cp.copy(trade)
offset_trade['time'] = (offset_trade['time'] + offset).sorted()
return offset_trade[['sym', 'time']]
# compute N markouts and calculate slippage using mid price
def trade_to_mid_markouts(trade, quote):
import os
os.environ['QLIC'] = '/dbfs/tmp/'
import pykx as kx
trade = kx.toq(trade)
quote = kx.toq(quote)
trade = trade.sort_values(['time']).grouped('sym')
quote = quote.sort_values(['time']).grouped('sym')
quote['mid'] = (quote['ask'] + quote['bid']) / 2
# markouts we are interested in
seconds = [1, 10, 30]
minutes = [1, 5, 10, 30]
qseconds = kx.toq(seconds).cast(kx.SecondVector)
qminutes = kx.toq(minutes).cast(kx.MinuteVector)
for s, q in zip(seconds, qseconds):
trade['tp'+str(s)+'s'] = trade['price'] - time_offset(trade, q).merge_asof(quote[['sym', 'time', 'mid']], on=['sym', 'time'])['mid']
trade['tm'+str(s)+'s'] = trade['price'] - time_offset(trade, -q).merge_asof(quote[['sym', 'time', 'mid']], on=['sym', 'time'])['mid']
for s, q in zip(minutes, qminutes):
trade['tp'+str(s)+'m'] = trade['price'] - time_offset(trade, q).merge_asof(quote[['sym', 'time', 'mid']], on=['sym', 'time'])['mid']
trade['tm'+str(s)+'m'] = trade['price'] - time_offset(trade, q).merge_asof(quote[['sym', 'time', 'mid']], on=['sym', 'time'])['mid']
return trade.pd()
Pass the trade_pdf
and quote_pdf
Pandas Dataframes to the trade_to_mid_markouts
function in order to calculate the slippage:
result = trade_to_mid_markouts(trade_pdf, quote_pdf)
print('Executions input row count - ', f'{trade_pdf.shape[0]:,}')
print('Market quotes input row count - ', f'{quote_pdf.shape[0]:,}')
print('TCA report result row count - ', f'{kx.q.count(result).np():,}')
result.tail()
Since we have supplied a Pandas dataframe, the workload will be performed by only the Python driver.
Output
Executions input row count - 1,851,406
Market quotes input row count - 27,424,147
TCA report result row count - 1,851,406
time sym size price tp1s tm1s tp10s tm10s tp30s tm30s tp1m tm1m tp5m tm5m tp10m tm10m tp30m tm30m
1851401 2022-10-03 19:59:55.559408 AAPL 1 142.935 0.000 0.000 -0.025 0.010 -0.025 0.045 -0.025 -0.025 -0.025 -0.025 -0.025 -0.025 -0.025 -0.025
1851402 2022-10-03 19:59:58.444119 MSFT 25 241.700 0.185 0.185 0.185 0.185 0.185 3.200 0.185 0.185 0.185 0.185 0.185 0.185 0.185 0.185
1851403 2022-10-03 19:59:59.146947 AAPL 1 142.935 -0.025 0.000 -0.025 0.010 -0.025 0.055 -0.025 -0.025 -0.025 -0.025 -0.025 -0.025 -0.025 -0.025
1851404 2022-10-03 19:59:59.551589 MSFT 1 241.700 0.185 0.185 0.185 0.185 0.185 3.200 0.185 0.185 0.185 0.185 0.185 0.185 0.185 0.185
1851405 2022-10-03 19:59:59.668143 MSFT 150 241.700 0.185 0.185 0.185 0.185 0.185 3.200 0.185 0.185 0.185 0.185 0.185 0.185 0.185 0.1815
Slippage markouts (PySpark/Multi node)
In the below example, we will use PyKX to calculate the execution slippage markouts for all of the data (11686 tickers) using PySpark (Multi node).
Prepare the schema for the results dataframe:
full_schema = "time timestamp, sym string, size int, price double"
seconds = [1, 10, 30]
minutes = [1, 5, 10, 30]
for s in seconds:
full_schema += ', tp'+str(s)+'s float'
full_schema += ', tm'+str(s)+'s float'
for m in minutes:
full_schema += ', tp'+str(m)+'m float'
full_schema += ', tm'+str(m)+'m float'
Pass the trade_sdf_all_syms
and quote_sdf_all_syms
Spark Dataframes to the trade_to_mid_markouts
function in order to calculate the slippage:
result_pyspark = trade_sdf_all_syms.groupby('sym').cogroup(quote_sdf_all_syms.groupby('sym')).applyInPandas(trade_to_mid_markouts, schema=full_schema)
print('Executions input row count - ', f'{trade_sdf_all_syms.count():,}')
print('Market quotes input row count - ', f'{quote_sdf_all_syms.count():,}')
print('TCA report result row count - ', f'{result_pyspark.count():,}')
result_pyspark.show(5)
Since we have supplied a Spark dataframe, the workload will be farmed out to the configured worker nodes for processing before returning the result to the Python driver.
Output
Executions input row count - 77,026,358
Market quotes input row count - 1,541,781,424
TCA report result row count - 77,026,358
+--------------------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+-----+-----+
| time| sym|size|price| tp1s| tm1s|tp10s|tm10s|tp30s|tm30s| tp1m| tm1m| tp5m| tm5m| tp10m| tm10m|tp30m|tm30m|
+--------------------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+-----+-----+
|2022-10-03 09:16:...|AKIC U| 85| 9.95| 0.21| 0.21| 0.21| 0.21| 0.21| 0.21| 0.21| 0.21| 0.17| 0.17|-0.265|-0.265|-0.03|-0.03|
|2022-10-03 11:20:...|AKIC U| 9| 9.96| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0|
|2022-10-03 11:56:...|AKIC U| 21| 9.96| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -1.0| -0.02| -0.02|-0.02|-0.02|
|2022-10-03 13:36:...|AKIC U| 21| 9.95|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01| -0.01| -0.01|-0.01|-0.01|
|2022-10-03 13:36:...|AKIC U| 21| 9.95|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01|-0.01| -0.01| -0.01|-0.01|-0.01|
+--------------------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+-----+-----+