Data

The Data module is responsible for the persistent storage and representation of OHLCV (Open, High, Low, Close, Volume) data.

The primary backend is Zarr, which is similar to Feather or Parquet in that it optimizes for columnar data, or more generally, arrays. Zarr is simpler and allows for different encoding schemes. It supports compression by default and can be backed by various storage layers, including network-based ones. Compared to NoSQL databases, columnar storage has the drawback of having to read chunks for queries. However, we are almost always interested in time-series data, not scalar values, so the latency loss is negligible.

We wrap a Zarr subtype of AbstractStore in a Planar.Data.ZarrInstance. The module holds a global ZarrInstance at Data.zi[]. The default store used relies on LMDB. OHLCV data is organized according to exchanges, pairs, and timeframes (Planar.Data.key_path).

There are several ways to collect data:

Using the Scrapers module

Currently, there is support for Binance and Bybit archives.

using Scrapers: Scrapers as scr, BinanceData as bn
## Download klines for ETH
bn.binancedownload("eth", market=:data, freq=:monthly, kind=:klines)
## load them
bn.binanceload("eth", market=:data, freq=:monthly, kind=:klines)
## Default market parameter is `:um` (usdm futures)

# show all symbols that can be downloaded
bn.binancesyms(market=:data)
# load/download also accept `quote_currency` to filter by (default `usdt`)
scr.selectsyms(["eth"], bn.binancesyms(market=:data), quote_currency="usdc")
Downloads are cached

Downloading the same pair path again will only download newer archives. If data gets corrupted, pass reset=true to redownload it again.

Using the Fetch module

The Fetch module downloads data directly from the exchange using ccxt.

using TimeTicks
using Exchanges
using Fetch: Fetch as fe

exc = getexchange!(:kucoin)
timeframe = tf"1m"
pairs = ("BTC/USDT", "ETH/USDT")
# Will fetch the last 1000 candles, `to` can also be passed to download a specific range
fe.fetch_ohlcv(exc, timeframe, pairs; from=-1000) # or `fetch_candles` for unchecked data

Fetching directly from exchanges is not recommended for smaller timeframes since they are heavily rate-limited. Archives are a better option.

Using Watchers

With the Watchers module, you can track live data from exchanges or other data sources and store it locally. Implemented are watchers that track OHLCV:

using Exchanges
using Planar.Watchers: Watchers as wc, WatchersImpls as wi
exc = getexchange!(:kucoin)

w = wi.ccxt_ohlcv_tickers_watcher(exc;)
wc.start!(w)
>>> w
17-element Watchers.Watcher20{Dict{String, NamedTup...Nothing, Float64}, Vararg{Float64, 7}}}}}
Name: ccxt_ohlcv_ticker
Intervals: 5 seconds(TO), 5 seconds(FE), 6 minutes(FL)
Fetched: 2023-03-07T12:06:18.690 busy: true
Flushed: 2023-03-07T12:04:31.472
Active: true
Attemps: 0

As a convention, the view property of a watcher shows the processed data. In this case, the candles processed by the ohlcv_ticker_watcher will be stored in a dict.

>>> w.view
Dict{String, DataFrames.DataFrame} with 220 entries:
  "HOOK/USDT"          => 5×6 DataFrame…
  "ETH/USD:USDC"       => 5×6 DataFrame…
  "PEOPLE/USDT:USDT"   => 5×6 DataFrame…

There is another OHLCV watcher based on trades, that tracks only one pair at a time.

w = wi.ccxt_ohlcv_watcher(exc, "BTC/USDT:USDT"; timeframe=tf"1m")
w.view
956×6 DataFrame
 Row │ timestamp            open     high     low      close    volume  
     │ DateTime             Float64  Float64  Float64  Float64  Float64 
─────┼──────────────────────────────────────────────────────────────────
...

Other implemented watchers are the orderbook watcher, and watchers that parse data feeds from 3rd party APIs.

Other sources

Assuming you have your own pipeline to fetch candles, you can use the functions Planar.Data.save_ohlcv and Planar.Data.load_ohlcv to manage the data. To save the data, it is easier if you pass a standard OHLCV dataframe, otherwise you need to provide a saved_col argument that indicates the correct column index to use as the timestamp column (or use lower-level functions).

using Planar
@environment!
@assert da === Data
source_name = "mysource"
pair = "BTC123/USD"
timeframe = "1m"
zi = Data.zi # the global zarr instance, or use your own
mydata = my_custom_data_loader()
da.save_ohlcv(zi, source_name, pair, timeframe, mydata)

To load the data back:

da.load_ohlcv(zi, source_name, pair, timeframe)

Data is returned as a DataFrame with open,high,low,close,volume,timestamp columns. Since these save/load functions require a timestamp column, they check that the provided index is contiguous, it should not have missing timestamps, according to the subject timeframe. It is possible to disable those checks by passing check=:none.

If you want to save other kinds of data, there are the Planar.Data.save_data and Planar.Data.load_data functions. Unlike the ohlcv functions, these functions don't check for contiguity, so it is possible to store sparse data. The data, however, still requires a timestamp column, because data when saved can either be prepend or appended, therefore an index must still be available to maintain order. While OHLCV data requires a concrete type for storage (default Float64) generic data can either be saved with a shared type, or instead serialized. To serialize the data while saving pass the serialize=true argument to save_data, while to load serialized data pass serialized=true to load_data.

When loading data from storage, you can directly use the ZArray by passing raw=true to load_ohlcv or as_z=true or with_z=true to load_data. By managing the array directly you can avoid materializing the entire dataset, which is required when dealing with large amounts of data.

Indexing

The Data module implements dataframe indexing by dates such that you can conveniently access rows by:

df[dt"2020-01-01", :high] # get the high of the date 2020-01-01
df[dtr"2020-..2021-", [:high, :low]] # get all high and low for the year 2020
after(df, dt"2020-01-01") # get all candles after the date 2020-01-01
before(df, dt"2020-01-01") # get all candles up until the date 2020-01-01

With ohlcv data, we can access the timeframe of the series directly from the dataframe by calling timeframe!(df). This will either return the previously set timeframe or infer it from the timestamp column. You can set the timeframe by calling e.g. timeframe!(df, tf"1m") or timeframe!! to overwrite it.

Caching

Data.Cache.save_cache and Data.Cache.load_cache can be used to store generic metadata like JSON payloads. The data is saved in the Planar data directory which is either under the XDG_CACHE_DIR[1] if set or under $HOME/.cache by default.

  • 1Default path might be a scratchspace (from Scratch.jl) in the future