apiVersion: insights.kx.com/v1 kind: Assembly metadata: name: ref-data labels: env: dev spec: labels: type: basic tables: trade: description: Trade date type: partitioned blockSize: 10000 prtnCol: time sortColsOrd: [sym] sortColsDisk: [sym] columns: - name: sym description: trade symbol type: symbol attrMem: grouped attrDisk: parted attrOrd: parted - name: code type: symbol foreign: markets.code description: Code for the market the stock was exchanged on - name: price type: float - name: time description: timestamp type: timestamp markets: description: reference market data type: splayed updTsCol: updateTS primaryKeys: - code columns: - name: code type: symbol description: Market code - name: opCode type: string description: Market operating (parent) code - name: site type: string description: Market website - name: updateTS description: Timestamp of last mutation type: timestamp mounts: rdb: type: stream baseURI: none partition: none dependency: - idb idb: type: local baseURI: file:///data/db/idb partition: ordinal hdb: type: local baseURI: file:///data/db/hdb partition: date dependency: - idb elements: sp: description: Stream for scrapping market identifier codes pipelines: scrapper: protectedExecution: false source: north destination: south spec: |- getMarketIdentifierCodes:{[] -1 "Downloading Market Identifier Codes"; resp:.kurl.sync ("https://www.iso20022.org/sites/default/files/ISO10383_MIC/ISO10383_MIC.csv";`GET;()!()); if[200i <> resp 0; -2 "Downloading data response code is: ", string resp 0; -2 "Reason: ", resp 1 '"Failed to download market identifier codes"; ]; -1 "Parsing market identifier codes from CSV"; t:(12#"S";enlist ",") 0: "\r\n" vs last resp; // Rename cols so they are q friendly t:`country`iso`code`opCode`os`insitution`acronym`city`site`statusDate`status`creationDate xcol t; // For brevity, we only save a few columns. -1 "Returning code + opCode + site data"; :select code, opCode, site:string site from t; }; onDownloadError:{[x] -2 "Failed to download initial market identifier codes. ",x,". Returning mock"; :([] code:`XCHI`XNYS; opCode:`XNYS`XNYS; site:("WWW.NYSE.COM";"WWW.NYSE.COM")) }; // Refreshing market codes info .pub.markets:{ -1 string[.z.p]," reloading market reference data"; pubMarkets @[getMarketIdentifierCodes;::;onDownloadError]; }; // Create timers to update markets codes .qsp.onStart { .tm.add[`markets;(`.pub.markets;::);14400000;0]; // Redownload every 4 hours }; .qsp.run .qsp.read.fromCallback[`pubMarkets] .qsp.map[{ update updateTS: .z.p from x }] .qsp.write.toStream[`markets] trade-feed: protectedExecution: false source: north destination: south spec: |- N:100; // Rows of random trade date per update // Append N random trades that reference random markets // Trades occur randomly on different branches of NYSE .pub.trades:{ -1 string[.z.p]," publishing trades"; pubTrades ([] sym : N?`AAPL`MSFT`EBAY`SHOP; code : N?`XNLI`NYSD`AMXO`ARCD`ARCO`XNYS`XCHI; price : N?2000f )}; // Create timers to append trades .qsp.onStart { .tm.add[`trade;(`.pub.trades;::);5000;0]; // Add N rows every 5 seconds } .qsp.run .qsp.read.fromCallback[`pubTrades] .qsp.map[{ update time: .z.p from x }] .qsp.write.toStream[`trade]; sm: source: south tiers: - name: streaming mount: rdb - name: interval mount: idb schedule: freq: 01:00:00 snap: 00:00:00 - name: recent mount: hdb schedule: freq: 1D00:00:00 snap: 01:35:00 dap: instances: idb: size: 1 mountName: idb hdb: size: 1 mountName: hdb rdb: size: 1 mountName: rdb source: south sequencer: south: external: false k8sPolicy: resources: requests: memory: "512Mi" cpu: 0.5 limits: memory: "512Mi" cpu: 0.5 north: k8sPolicy: resources: requests: memory: "512Mi" cpu: 0.5 limits: memory: "512Mi" cpu: 0.5 external: true topicConfig: subTopic: "data"