KdbplusForMortals/segments

From Kx Wiki
Jump to: navigation, search

Contents

1.4 Segments

1.4.0 Overview

Recall that a query against a partitioned table executes sequentially across the required partitions. I/O bandwidth limits performance on large partitioned tables because retrieving data from physical storage is much slower that processing that data once it is in memory. Simply put, the q process will spend time waiting for data, do a flurry of calculations, only to wait on the next chunk of data. The way to address this is to take advantage of parallel I/O and concurrent processing. To this end, we aim to spread a table across multiple I/O channels to facilitate parallel retrieval.

1.4.1 Segmented Tables

Segmentation is an additional level of structure on top of partitioning. Segmentation spreads a partitioned table’s records across multiple storage locations. Each subset of records, called a segment, is a directory that contains a collection of partition directories. The segment directories are presumably on different I/O channels so that data retrieval can occur without contention.

You can use any criteria to decompose partition slices, as long as you end up with conforming record subsets that are disjoint and complete—i.e., they reconstitute the original table with no omissions or duplication. The decomposition can be along rows, along partitions or by some combination thereof, but it cannot occur along columns.

Note: You must ensure that the segments conform and are complete and disjoint, since kdb+ will not check this when you write the data files. In particular, overlapping segments will result in duplicate records in query results and an incomplete decomposition will result in dropped records.

Big Picture (3): We think of a segmented table as a three-dimensional persisted form: the table is cut vertically by splaying, sliced horizontally by partitions and is additionally spread across physical locations in segments. The primary purpose of the third dimension is to allow operations against the tables to take advantage of parallel I/O and concurrent processing. Following is an abstract representation of segmentation:

segment1

column* column*
partition*
partition*

segment2

column* column*
partition*
partition*

segment3

column* column*
partition*
partition*

Here each ▪ represents a (partial) column of a partition slice as an opaque entity.

1.4.1.1 Contrasting Partitions and Segments

Although partitioning and segmentation are similar in that they both group a table’s records into directories, there are fundamental differences. Remember, segmentation sits above partitioning, so all segmented tables are partitioned but the converse is not true.

Partitioned Table Segmented Table
Record location All partitions (and hence all records) reside under the root directory. None of the segments (and hence no records) reside under the root.
I/O channels All partitions (and hence all records) reside on a single I/O channel. The segments (and hence the records) should reside on multiple I/O channels.
Processing Partitions loaded and processed sequentially in aggregation queries. Given appropriate slaves and cores, aggregate queries load segments in parallel and process them concurrently.
Decomposition Partition by grouping rows on the values of a virtual column of underlying integral type. Segment by any criteria yielding disjoint and complete decomposition.
Symbols Cannot partition on a symbol column Can segment along a symbol column
Virtual Column Partition column not stored. Virtual column values inferred from directory names No special column associated with segmentation (virtual column from underlying partition still present)

1.4.1.2 Segment Layout Examples

In contrast to the partitioned table layout in which partitions reside under the root, the segment directories must not reside under the root. The only portion of a segmented table (other than the sym file for enumerated symbol columns) that lives in the root is a file called par.txt, which contains the paths of the physical locations of the segments, one segment path per line.

Here is how the abstract segmentation discussed above would be laid out on the file system:

/db
    [sym]
    par.txt
===============  <- channel 1
/segment1
    /partition*
        /table*
        /table*
        …
    /partition*
        /table*
        /table*
        …
===============  <- channel 2
/segment2
    /partition*
        /table*
        /table*
        …
    /partition*
        /table*
        /table*
    …
===============
…

To make this more concrete, we demonstrate how to segment daily trades (and eventually quotes) in several useful ways. To begin, we create segments by bucketing trades into alternating days of the week:

/1                  <- drive 1
    /2009.01.01
        /t          <- day’s trades
    /2009.01.03
        /t          <- day’s trades
    …
=============
/2                  <- drive 2
    /2009.01.02
        /t          <- day’s trades
    /2009.01.04
        /t          <- day’s trades
    …

This segmentation represents grouping of partitions, so it is orthogonal to the partitioning. It is clearly complete and disjoint and is easily generalizable to round-robining every n business days.

We could alternatively create segments by splitting the daily slices into records with symbols starting with a-m and those starting with n-z. Here we are decomposing based on the values in a symbol column, which we could not do with simple partitioning.

/am                 <- drive 1
    /2009.01.01
        /t          <- day’s trades for syms a-m
    /2009.01.02
        /t          <- day’s trades for syms a-m
    …
=============
/nz                 <- drive 2
    /2009.01.01
        /t          <- day’s trades for syms n-z
    /2009.01.02
        /t          <- day’s trades for syms n-z
    …

This segmentation clearly results in complete, disjoint subsets. It is not orthogonal to the partitioning because a single day's trades span multiple segments. It is easily generalizable to n segments by splitting the alphabet into n roughly equal portions.

Alternately, we can create segments by splitting the daily slices into trades from NYSE and trades from NASDAQ.

/nyse               <- drive 1
    /2009.01.01
        /t          <- day’s trades for nyse
    /2009.01.02
        /t          <- day’s trades for nyse
    …
=============
/nasd               <- drive 2
    /2009.01.01
        /t          <- day’s trades for nasd
    /2009.01.02
        /t          <- day’s trades for nasd
    …

The segmentation is also not orthogonal to the partitioning since a single day's trades span multiple segments. It is clearly complete and disjoint and is easily generalizable to multiple exchanges—e.g., all Asian exchanges.

Finally, we provide an example of a non-uniform segmentation, in that some partitions span segments and others do not. The A segment contains trades from the beginning of day 1 until lunchtime of day 2. The B segment contains the trades from lunchtime of day 2 through the end of day 3. The C segment contains all trades from day 4.

/seg A              <- drive 1
    /2009.01.01
        /t          <- entire day’s trades
    /2009.01.02
        /t          <- morning trades
=============
/seg B              <- drive 2
    /2009.01.02
        /t          <- afternoon trades
    /2009.01.03
        /t          <- entire day’s trades
=============
/seg C              <- drive 3
    /2009.01.04
        /t          <- entire day’s trades

This segmentation is disjoint and complete and it is not orthogonal to the partitioning.

1.4.2 Creating Segments

As with partitions, there is no one-size-fits-all utility to create segments. Instead, you write a q program that places the appropriate subset of each partition slice into the correct segment directory. You can create historical segments and partitions at the same time by including logic in your load script to extract slice subsets and place them in the appropriate directory on the appropriate drive.

Along with creating the partitions, you must also specify the locations of the segments in an ASCII text file par.txt located in the root. Each line of par.txt contains the path of one segment directory; symlinks are acceptable.

Important: The segment paths must not be under the root. Beginners sometimes make this mistake and the result is like listening to Vogon poetry.

We illustrate the process by creating segmented tables for each example in the previous section. Our intent is to demonstrate how to create the segments and see that execution of the query pieces the segment results back together. We run our simple queries without concern for placing the segments on multiple drives and running multiple slaves, as we are not (yet) considering performance.

1.4.2.1 Trades Segmented by Day of Week

Start with a fresh /db. This segmentation groups partitions in alternating days as an example of round-robining. It resides in the following directory structure:

/1
    /2009.01.01
        /t
    /2009.01.03
        /t
/2
    /2009.01.02
        /t
    /2009.01.04
        /t

The corresponding par.txt is:

/1
/2

A day’s trade table has the schema:

([] ti:`time$(); sym:`sym$(); p:`float$())

Segment construction is simple: put successive days in alternating segments.

    `:/1/2009.01.01/t/ set
        ([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`t; p:101 17f)

    `:/2/2009.01.02/t/ set
        ([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`t; p:101.5 17.5)

    `:/1/2009.01.03/t/ set
        ([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`t; p:103 16.5f)

    `:/2/2009.01.04/t/ set
        ([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`t; p:102 17f)

    `:/db/par.txt 0: ("/1"; "/2")
`:/db/par.txt

    \l /db
    select from t where date within 2009.01.01 2009.01.04
date       ti       s   p
-----------------------------
2009.01.01 09:30:00 ibm 101
2009.01.01 09:31:00 t   17
2009.01.02 09:30:00 ibm 101.5
2009.01.02 09:31:00 t   17.5
2009.01.03 09:30:00 ibm 103
2009.01.03 09:31:00 t   16.5
2009.01.04 09:30:00 ibm 102
2009.01.04 09:31:00 t   17

1.4.2.2 Trades Segmented by Symbol Range

Start with a fresh /db. The segments for symbols a-m and n-z reside in the directory structure:

/am
    /2009.01.01
    /2009.01.02
/nz
    /2009.01.01
    /2009.01.02

The corresponding par.txt is:

/am
/nz

A day’s trade table has the schema:

([] ti:`time$(); sym:`sym$(); p:`float$())

Do not confuse the column sym with the enumeration sym or the file named sym. They are all distinct.

We make a utility function to extract trades by symbol range:

extr:{[t;r] select from t where (`$1#'string sym) within r}

We spread each day’s trades across the two segments based on values in the column sym of type symbol. Observe that we store sym along with the other columns.

    t1:([] ti:09:30:00 09:31:00; sym:`:/db/sym?`ibm`t; p:101 17f)

    `:/am/2009.01.01/t/ set extr[t1;`a`m]
`:/am/2009.01.01/t/
    `:/nz/2009.01.01/t/ set extr[t1;`n`z]
`:/nz/2009.01.01/t/

    t2:([] ti:09:30:00 09:31:00; sym:`:/db/sym?`ibm`t; p:101.5 17.5)
    `:/am/2009.01.02/t/ set extr[t2;`a`m]
`:/am/2009.01.02/t/
    `:/nz/2009.01.02/t/ set extr[t2;`n`z]
`:/nz/2009.01.02/t/

Now we create par.txt and map the table:

    `:/db/par.txt 0: ("/am"; "/nz")
`:/db/par.txt

    \l /db
    select from t where date within 2009.01.01 2009.01.02
date       ti       sym p
-------------------------------
2009.01.01 09:30:00 ibm 101
2009.01.01 09:31:00 t   17
2009.01.02 09:30:00 ibm 101.5
2009.01.02 09:31:00 t   17.5

1.4.2.3 Trades Segmented by Exchange

Start with a fresh /db. The exchange segments reside in the following directory structure:

/nyse
    /2009.01.01
        /t
    /2009.01.02
        /t
/nasd
    /2009.01.01
        /t
    /2009.01.02
        /t

The corresponding par.txt is:

/nyse
/nasd

The trade table has the schema:

([] ti:`time$(); sym:`sym$(); p:`float$(); ex:`sym$())

Segment construction is completely analogous to that of the previous example.

    extr:{[t;e] select from t where ex=e}

    t1:([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`aapl; p:101 17f;
        ex:`:/db/sym?`n`o)

    `:/nyse/2009.01.01/t/ set extr[t1;`n]
`:/nyse/2009.01.01/t/
    `:/nasd/2009.01.01/t/ set extr[t1;`o]
`:/nasd/2009.01.01/t/

    t2:([] ti:09:30:00 09:31:00; s:`:/db/sym?`aapl`ibm; p:143 102f;
        ex:`:/db/sym?`o`n)

    `:/nyse/2009.01.02/t/ set extr[t2;`n]
`:/nyse/2009.01.02/t/
    `:/nasd/2009.01.02/t/ set extr[t2;`o]
`:/nasd/2009.01.02/t/

    `:/db/par.txt 0: ("/nyse"; "/nasd")
`:/db/par.txt

    \l /db
    select from t where date within 2009.01.01 2009.01.02
date       ti       s    p   ex
-------------------------------
2009.01.01 09:30:00 ibm  101 n
2009.01.01 09:31:00 g    17  o
2009.01.02 09:31:00 ibm  102 n
2009.01.02 09:30:00 aapl 143 o

1.4.2.4 Trades Segmented across Partitions

Start again with a fresh /db. The segmentation layout is:

/seg A
    /2009.01.01
    /2009.01.02
/seg B
    /2009.01.02
    /2009.01.03
/seg C
    /2009.01.04

so part.txt is:

/A
/B
/C

Segment construction is straightforward.

    t1:([] ti:09:30:00 12:31:00; s:`:/db/sym?`ibm`t; p:101 17f)
    `:/A/2009.01.01/t/ set t1
`:/A/2009.01.01/t/

    t2:([] ti:09:31:00 12:32:00; s:`:/db/sym?`ibm`t; p:102 18f)
    `:/A/2009.01.02/t/ set select from t2 where ti<=12:00:00
`:/A/2009.01.02/t/

    `:/B/2009.01.02/t/ set select from t2 where ti>12:00:00
`:/B/2009.01.02/t/

    t3:([] ti:09:33:00 12:33:00; s:`:/db/sym?`ibm`t; p:103 19f)
    `:/B/2009.01.03/t/ set t3
`:/B/2009.01.03/t/

    t4:([] ti:09:34:00 12:35:00; s:`:/db/sym?`ibm`t; p:104 20f)
    `:/C/2009.01.04/t/ set t4
`:/C/2009.01.04/t/

    `:/db/par.txt 0: ("/A";"/B";"/C")
`:/db/par.txt

    \l /db
    select from t where date within 2009.01.01 2009.01.04
date       ti       s   p
---------------------------
2009.01.01 09:30:00 ibm 101
2009.01.01 12:31:00 t   17
2009.01.02 09:31:00 ibm 102
2009.01.02 12:32:00 t   18
2009.01.03 09:33:00 ibm 103
2009.01.03 12:33:00 t   19
2009.01.04 09:34:00 ibm 104
2009.01.04 12:35:00 t   20

1.4.3 Multiple Segmented Tables

Two tables that share the same partitioning are not required to share the same segmentation decomposition, but they often do. This is commonly the case with tables such as trades and quotes that are used together.

We demonstrate a shared segmentation for daily trades and quotes that incidentally addresses a problem of some level-two tick sources—e.g., OPRA data can contain more than 2 billion (i.e., 2*10003) records per day. Since a list in q cannot currently have more that 2 billion items, the resulting partial columns in the slice would be too large. Our solution is to segment by the first character of the symbol, dividing the alphabet into (roughly) equal ranges.

Note: As of this writing (Jul 2010), kdb+ does not perform map-reduce optimization for joins—e.g., lj, aj or wj. If you need to perform large joins, consider setting up a dedicated join server, as the performance characteristics of joins are significantly different from those of aggregations. By carefully constructing your segmentation and using peach, you can nudge kdb+ into parallel I/O and concurrent processing for joins.

Our actual segmentation layout is designed to allow parallel retrieval for aj and wj. We first observe that both perform an equijoin on a single symbol as well as the date virtual column. Also observe that they perform a non-equijoin on the time column, so we need all the time values for a given symbol and date in one segment. Consequently, if we segment by symbol range, we can parallelize aj or wj across symbols.

Here is a simple version of this scheme:

//drive1                <- one day’s trades and quotes
    /a_m                <- segment for first portion of alphabet
        /2009.01.01     <- the specific day
            /t          <- that day’s trades for symbols a-m
            /q          <- that day’s quotes for symbols a-m
        /2009.01.02     <- the specific day
            /t          <- that day’s trades for symbols a-m
            /q          <- that day’s quotes for symbols a-m
=================
//drive2                <- one day’s trades and quotes
    /n_z
        /2009.01.01     <- the specific day
            /t          <- that day’s trades for symbols n-z
            /q          <- that day’s quotes for symbols n-z
        /2009.01.02     <- the specific day
            /t          <- that day’s trades for symbols n-z
            /q          <- that day’s quotes for symbols n-z
=================
…

Here is the corresponding par.txt:

//drive1/a_m
//drive2/n_z
…

This segmentation is clearly disjoint and complete. From the above observations, we can use peach to achieve parallel I/O for aj and wj. One caution: were a query to return all quotes for a given day, the result would be too large to fit in a list if the (unsegmented) quote table exceeds 2 billion rows. You should guard against this in a real application.

Setting up the segments and partitions is a simple matter of getting the details right. In our simplified example, we pretend that the directories /am and /nz reside on different drives.

    extr:{[t;r] select from t where (`$1#'string sym) within r}

/ day 1
    t:([] ti:09:30:00 09:31:00; sym:`:/db/sym?`ibm`t; p:101 17f)
    q:([] ti:09:29:59 09:29:59 09:30:00; sym:`:/db/sym?`ibm`t`ibm;
        p:100.5 17 101)

    `:/am/2009.01.01/t/ set extr[t;`a`m]
`:/am/2009.01.01/t/
    `:/nz/2009.01.01/t/ set extr[t;`n`z]
`:/nz/2009.01.01/t/

    `:/am/2009.01.01/q/ set extr[q;`a`m]
`:/am/2009.01.01/q/
    `:/nz/2009.01.01/q/ set extr[q;`n`z]
`:/nz/2009.01.01/q/

/ day 2
    t:([] ti:09:30:00 09:31:00; sym:`:/db/sym?`t`ibm; p:17.1 100.9)
    q:([] ti:09:29:59 09:29:59 09:30:00; sym:`:/db/sym?`t`ibm`t;
        p:17 100.8 17.1)

    `:/am/2009.01.02/t/ set extr[t;`a`m]
`:/am/2009.01.02/t/
    `:/nz/2009.01.02/t/ set extr[t;`n`z]
`:/nz/2009.01.02/t/
    `:/am/2009.01.02/q/ set extr[q;`a`m]
`:/am/2009.01.02/q/
    `:/nz/2009.01.02/q/ set extr[q;`n`z]
`:/nz/2009.01.02/q/

    `:/db/par.txt 0: ("/am"; "/nz")
`:/db/par.txt

    dr:2009.01.01 2009.01.02

    select from t where date within dr
date       ti       sym p
-----------------------------
2009.01.01 09:30:00 ibm 101
2009.01.01 09:31:00 t   17
2009.01.02 09:31:00 ibm 100.9
2009.01.02 09:30:00 t   17.1

    select from q where date within dr
date       ti       sym p
-----------------------------
2009.01.01 09:29:59 ibm 100.5
2009.01.01 09:30:00 ibm 101
2009.01.01 09:29:59 t   17
2009.01.02 09:29:59 ibm 100.8
2009.01.02 09:29:59 t   17
2009.01.02 09:30:00 t   17.1

The naïve way to perform an asof join loads all requisite data at once.

    aj[`date`sym`ti;select from t where date within dr;
        select from q where date within dr]
date       ti       sym p
-----------------------------
2009.01.01 09:30:00 ibm 101
2009.01.01 09:31:00 t   17
2009.01.02 09:31:00 ibm 100.8
2009.01.02 09:30:00 t   17.1

Provided multiple data channels and cores are available to kdb+, we start kdb+ with multiple slaves and mimic its map-reduce strategy to take advantage of parallel data load against our segmentation.

    >q -s 2
KDB+ ...

    aj1:{aj[`sym`ti;select from t where date=d;
                    select from q where date=d]}

    raze aj1 peach 2009.01.01 2009.01.02
date       ti       sym p
-----------------------------
2009.01.01 09:30:00 ibm 101
2009.01.01 09:31:00 t   17
2009.01.02 09:31:00 ibm 100.8
2009.01.02 09:30:00 t   17.1

1.4.4 Query Execution on Segmented Tables (Advanced)

As you would expect, query execution against a segmented table involves additional complexity beyond partitioned table execution. The goal is to scale out by taking advantage of parallel I/O and concurrent processing. We would ideally like to achieve 100% saturation of the I/O channels and 100% utilization of each core. How do we approach these levels on a kdb+ server?

1.4.4.1 Preliminaries

The prime directive in kdb+ design and tuning is that a vector calculation on in-memory data is much faster than retrieving that data from storage. This suggests our first two objectives:

(1) Maximize the number of independent I/O channels to retrieve data in parallel.

(2) Maximize server memory in order to allocate each slave thread as much memory as it needs.

Suppose we have satisfied these two objectives and we have storage devices attached to the kdb+ server over n independent I/O channels. Because kdb+ can process segments in parallel, we are led to our next observation.

(3) Create n segments that spread the table across the channels in order to maximize I/O parallelization.

The precise form of segmentation will depend on the actual data and queries, but we ignore this detail in this discussion.

To ensure that data can be processed from all n channels simultaneously and that no two threads contend for data, we are led to:

(4) Have (at least) n slave threads—henceforth abbreviated slaves.

1.4.4.2 Execution over Segments

Here we describe how kdb+ executes a query against a segmented table in our scenario of n segments and n slaves. Essentially, it decomposes the query into two steps via map-reduce:

Note: The use of map-reduce allows kdb+ to perform preliminary calculations as close to the data as possible and to perform aggregation centrally at the last step.

To begin, kdb+ compares the query’s requisite partitions to the segment layout in par.txt and determines the footprint of the target partitions on each segment. The result is a nested list, each item being the partition list for one segment.

To execute the map step, kdb+ creates a revised query containing the map sub-operation from the original query, which it dispatches to all n slaves via peach. Each slave is provided the partition list for one segment and is directed to compute the revised query for its segment. For example, the revised query for avg is:

Armed with this knowledge, we examine execution within one slave, where the revised query is applied against a segment’s partition footprint. Recalling §1.3.4, kdb+ sequentially applies the map sub-operations of the original query across the targeted partitions to obtain partition result tables that it collects into a list representing one segment result.

Now we stand back and examine execution across the slaves by squinting to make partition detail disappear. At this level, the original query’s map step has n slaves retrieving segment data in parallel and concurrently calculating segment results. Once all slaves complete, the nested list of segment results is flattened and reordered by partition value. Finally, kdb+ employs the original query reduce step to combine the full list of ordered partition results into the query result table. Whew!

Note: Kdb+ treats a vanilla partitioned table (i.e., without a par.txt) as having a single segment. The astute reader will realize that the description in §1.3.4 is actually the degenerate case of this section.

1.4.4.3 Slaves and Cores

Executing the query via map-reduce provides good progress toward our original objective of I/O saturation. The slaves can load data from all n channels in parallel without contention. We now investigate channel and core utilization.

Note: Kdb+ will only use as many slaves to process a query as there are segments in the query footprint.

I/O-bound scenario: Assuming that the query has light calculation compared to data retrieval (common in kdb+), having n slaves on n cores is close to optimal: most of the time, all n slaves will be waiting for data. When a partition load completes, there will be a brief burst of computation, followed by another long load. So we conclude:

Balanced I/O-compute scenario: Now we consider the case in which the query’s calculation is intensive. While one slave is waiting on data, another slave on its core could be crunching; conversely, while one slave is crunching another slave on its core could be loading data. Therefore, to maximize channel and core utilization, we actually want 2n slaves on n cores. In view of the above note, we conclude that we should have 2n segments, two per channel.

On average, there will be one slave per core loading data and one slave per core crunching the data it has just loaded.

These conclusions rely on many implicit assumptions that we have glossed over. In practice, you should view them as guidelines, with an eye towards feeding data to kdb+ as fast as possible. The optimum configuration for your situation will depend on your particular query mix. For example, queries that do VWAP calculations are closer to the first scenario, whereas queries doing regression analysis are closer to the second.

A good strategy is to construct your initial configuration using one of the above scenarios. Then load a close approximation of your anticipated data and query mix, and simulate a realistic user load. Observe the I/O saturation and CPU utilization and adjust the number of slaves and cores allocated to the q process accordingly.

1.4.5 Examples

Our examples are based on simulated trade data randomly generated to match one month of US equity data spanning August and September 2009. The data is in the table trade whose most significant columns time, sym, tp and ts represent respectively arrival time, instrument symbol, trade price and trade size columns.

The trade data is partitioned by date. The following output is generated from a version of kdb+ that is instrumented to show the intermediate steps during query execution.

The first example query shows how the simple aggregate avg tp is decomposed into the pair sum and count in the map step, followed by division of the sums of sums by the sum of counts for the reduce step. Here the data is not segmented--i.e., there is no par.txt file.

The added instrumentation exposes 3 stages of execution of a query:

  1. Analyze query:
    • Decompose query into map and reduce components (if appropriate)
    • Determine and apply partition constraints
    • Map query onto segments and partitions (query plan)
  2. Execute map step, if appropriate
  3. Compute final result (reduce step)
q)select avg tp from trade where date in -3#date

"--- map/reduce: input aggregates, map query, reduce query ---"
(,`tp)!,(avg;`tp)
`0`1!((sum;($["f"];`tp));(#:;`i))
(,`tp)!,(%;(sum;`0);(sum;`1))

"--- query plan: segments/partitions ---"
(`trade;();()!();`0`1!((sum;($["f"];`tp));(#:;`i)))
2009.09.24 2009.09.25 2009.09.28

"--- partition query: query, segment, partition, result ---"
(`trade;();()!();`0`1!((sum;($["f"];`tp));(#:;`i)))
`:.
2009.09.24
+`0`1!(,1.419538e+10;,27914938)

"--- partition query: query, segment, partition, result ---"
(`trade;();()!();`0`1!((sum;($["f"];`tp));(#:;`i)))
`:.
2009.09.25
+`0`1!(,1.419318e+10;,24485503)

"--- partition query: query, segment, partition, result ---"
(`trade;();()!();`0`1!((sum;($["f"];`tp));(#:;`i)))
`:.
2009.09.28
+`0`1!(,1.388645e+10;,20162485)

"--- query plan result ---"
(+`0`1!(,1.419538e+10;,27914938);+`0`1!(,1.419318e+10;,24485503);
+`0`1!(,1.388645e+10;,20162485))

"--- final result ---"
tp
--------
582.5979

Note first that `0 and `1 are used as intermediate columns. Observe that avg tp is decomposed into a map step

`1:sum "f"$tp and `0:count i

and a reduce step

tp:(sum `0)%sum `1.

Also observe that expressions are displayed in their parse tree format and that count is expressed as its k equivalent: the monadic form of #.

The next example uses par.txt to segment the data into four segments: (/d/d1/data, /d/d2/data, /d/d3/data, /d/d4/data). It also uses a by date clause that eliminates the need to break up the query into map and reduce components (our data is partitioned by date). For clarity, we execute the query only on the last 16 partitions as well as on a subset of symbols. Observe that the "query plan" contains more information than that of the previous example: the query, a list of all partitions, a list of all segments, and a nested list of partitions belonging to segments.

q)select avg tp by date from trade where date in -16#date,sym in syms

"--- query plan: segments/partitions ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
2009.08.18 2009.08.19 2009.08.20 2009.08.21 2009.08.24 2009.08.25
2009.08.26 2009.08.27 2009.08.28 2009.08.31 2009.09.01 2009.09.02
2009.09.03 2009.09.04 2009.09.08 2009.09.09
`:/d/d1/data`:/d/d2/data`:/d/d3/data`:/d/d4/data
((`:/d/d1/data;2009.08.21 2009.08.25 2009.09.02);
(`:/d/d2/data;2009.08.18 2009.08.26 2009.09.03);
(`:/d/d3/data;2009.08.19 2009.08.27 2009.08.31 2009.09.04 2009.09.08);
(`:/d/d4/data;2009.08.20 2009.08.24 2009.08.28 2009.09.01 2009.09.09))

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d1/data
2009.08.21
+(,`tp)!,,15.42632

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d1/data
2009.08.25
+(,`tp)!,,15.04996

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d1/data
2009.09.02
+(,`tp)!,,14.16648

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d2/data
2009.08.18
+(,`tp)!,,14.16883

... (some output removed for brevity) ...

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d3/data
2009.09.08
+(,`tp)!,,15.59198

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.08.20
+(,`tp)!,,15.2657

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.08.24
+(,`tp)!,,14.75603

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.08.28
+(,`tp)!,,14.37194

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.09.01
+(,`tp)!,,13.25797

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.09.09
+(,`tp)!,,14.98316

"--- query plan result ---"
(+(,`tp)!,,14.16883;+(,`tp)!,,15.05272;+(,`tp)!,,15.2657;+(,`tp)!,,15.42632;
+(,`tp)!,,14.75603;+(,`tp)!,,15.04996;+(,`tp)!,,15.69218;+(,`tp)!,,15.53095;
+(,`tp)!,,14.37194;+(,`tp)!,,14.32488;+(,`tp)!,,13.25797;+(,`tp)!,,14.16648;
+(,`tp)!,,15.58938;+(,`tp)!,,16.1427;+(,`tp)!,,15.59198;+(,`tp)!,,14.98316)

"--- final result ---"
date      | tp
----------| --------
2009.08.18| 14.16883
2009.08.19| 15.05272
2009.08.20| 15.2657
2009.08.21| 15.42632
2009.08.24| 14.75603
2009.08.25| 15.04996
2009.08.26| 15.69218
2009.08.27| 15.53095
2009.08.28| 14.37194
2009.08.31| 14.32488
2009.09.01| 13.25797
2009.09.02| 14.16648
2009.09.03| 15.58938
2009.09.04| 16.1427
2009.09.08| 15.59198
2009.09.09| 14.98316

The above example was run with no slaves--i.e., on a single-threaded q process. Consequently, the queries run sequentially across segments and partitions.

Now observe what happens when we match the number of slaves to the number of segments in our data base by invoking q with -s 4.

q)select avg tp by date from trade where date in -16#date,sym in syms

"--- query plan: segments/partitions ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
2009.08.18 2009.08.19 2009.08.20 2009.08.21 2009.08.24 2009.08.25
2009.08.26 2009.08.27 2009.08.28 2009.08.31 2009.09.01 2009.09.02
2009.09.03 2009.09.04 2009.09.08 2009.09.09
`:/d/d1/data`:/d/d2/data`:/d/d3/data`:/d/d4/data
((`:/d/d1/data;2009.08.21 2009.08.25 2009.09.02);
(`:/d/d2/data;2009.08.18 2009.08.26 2009.09.03);
(`:/d/d3/data;2009.08.19 2009.08.27 2009.08.31 2009.09.04 2009.09.08);
(`:/d/d4/data;2009.08.20 2009.08.24 2009.08.28 2009.09.01 2009.09.09))

"--- partition query: query, segment, partition, result ---"
"--- partition query: query, segment, partition, result ---"
"--- partition query: query, segment, partition, result ---"

(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d1/data
2009.08.21
"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.08.20
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d3/data
2009.08.19
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d2/data
2009.08.18
+(,`tp)!,,15.55121

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d3/data
2009.08.27
+(,`tp)!,,15.47055
+(,`tp)!,,15.21819

"--- partition query: query, segment, partition, result ---"
"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d2/data
2009.08.26
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.08.24
+(,`tp)!,,14.81711
"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d1/data
2009.08.25
+(,`tp)!,,14.92875

"--- partition query: query, segment, partition, result ---"
+(,`tp)!,,16.07275

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d3/data
2009.08.31
+(,`tp)!,,15.55499
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.08.28

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d2/data
2009.09.03
+(,`tp)!,,13.43061
+(,`tp)!,,15.29159
+(,`tp)!,,12.64993
"--- partition query: query, segment, partition, result ---"
"--- partition query: query, segment, partition, result ---"
"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d3/data
`:/d/d4/data
2009.09.04
2009.09.01
+(,`tp)!,,176.6311
`:/d/d1/data
2009.09.02
+(,`tp)!,,151.7784
+(,`tp)!,,13.67089

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d4/data
2009.09.09
+(,`tp)!,,179.799

"--- partition query: query, segment, partition, result ---"
(`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp))
`:/d/d3/data
2009.09.08
+(,`tp)!,,193.7031
+(,`tp)!,,48.75286

"--- query plan result ---"
(+(,`tp)!,,15.47055;+(,`tp)!,,15.55121;+(,`tp)!,,15.21819;+(,`tp)!,,14.81711;
+(,`tp)!,,16.07275;+(,`tp)!,,15.29159;+(,`tp)!,,15.55499;+(,`tp)!,,14.92875;
+(,`tp)!,,12.64993;+(,`tp)!,,13.43061;+(,`tp)!,,13.67089;+(,`tp)!,,151.7784;
+(,`tp)!,,176.6311;+(,`tp)!,,179.799;+(,`tp)!,,193.7031;+(,`tp)!,,48.75286)

"--- final result ---"
date      | tp
----------| --------
2009.08.18| 15.47055
2009.08.19| 15.55121
2009.08.20| 15.21819
2009.08.21| 14.81711
2009.08.24| 16.07275
2009.08.25| 15.29159
2009.08.26| 15.55499
2009.08.27| 14.92875
2009.08.28| 12.64993
2009.08.31| 13.43061
2009.09.01| 13.67089
2009.09.02| 151.7784
2009.09.03| 176.6311
2009.09.04| 179.799
2009.09.08| 193.7031
2009.09.09| 48.75286

Now the output from four slaves running concurrently is interleaved. One slave executes for each segment, with each slave executing the query on its segment sequentially across its partitions. An appropriately configured server can take advantage of available I/O bandwith to speed up query execution using segments and slaves.


Prev: 1.3 Partitioned Tables Next: 1.5 Using .Q Utilities for Splayed and Partitioned Tables

Table of Contents

Personal tools
Namespaces
Variants
Actions
Navigation
Toolbox