Source code for hftbacktest.data.utils.binancehistmktdata

import csv
from typing import Optional

import numpy as np
from numpy.typing import NDArray

from .. import correct_event_order, validate_event_order
from ..validation import correct_local_timestamp
from ...types import (
    DEPTH_EVENT,
    DEPTH_SNAPSHOT_EVENT,
    TRADE_EVENT,
    BUY_EVENT,
    SELL_EVENT,
    event_dtype
)


[docs] def convert_snapshot( snapshot_filename: str, output_filename: Optional[str] = None, feed_latency: float = 0, has_header: Optional[bool] = None, ss_buffer_size: int = 1_000_000, ) -> NDArray: r""" Converts Binance Historical Market Data files into a format compatible with HftBacktest. Since it doesn't have a local timestamp, it lacks feed latency information, which can result in a significant discrepancy between live and backtest results. Collecting feed data yourself or obtaining the high quality of data from a data vendor is strongly recommended. https://www.binance.com/en/landing/data Args: snapshot_filename: Snapshot filename output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format. feed_latency: Artificial feed latency value to be added to the exchange timestamp to create local timestamp. has_header: True if the given file has a header, it will automatically detect it if set to None. Returns: Converted data compatible with HftBacktest. """ ss_bid = np.empty(ss_buffer_size, event_dtype) ss_ask = np.empty(ss_buffer_size, event_dtype) ss_bid_rn = 0 ss_ask_rn = 0 timestamp_col = None side_col = None price_col = None qty_col = None # Reads snapshot file print('Reading %s' % snapshot_filename) with open(snapshot_filename, 'r', newline='') as f: reader = csv.reader(f, delimiter=',') for row in reader: if timestamp_col is None: if has_header is None: if row[0] == 'symbol': has_header = True else: has_header = False if has_header: header = row else: header = [ 'symbol', 'timestamp', 'trans_id', 'first_update_id', 'last_update_id', 'side', 'update_type', 'price', 'qty' ] if len(header) != len(row): raise ValueError timestamp_col = header.index('timestamp') side_col = header.index('side') price_col = header.index('price') qty_col = header.index('qty') if has_header: continue exch_ts = int(row[timestamp_col]) local_ts = exch_ts + feed_latency side = 1 if row[side_col] == 'b' else -1 price = float(row[price_col]) qty = float(row[qty_col]) if side == 1: ss_bid[ss_bid_rn] = ( DEPTH_SNAPSHOT_EVENT | BUY_EVENT, exch_ts, local_ts, price, qty, 0, 0, 0 ) ss_bid_rn += 1 else: ss_ask[ss_ask_rn] = ( DEPTH_SNAPSHOT_EVENT | SELL_EVENT, exch_ts, local_ts, price, qty, 0, 0, 0 ) ss_ask_rn += 1 ss_bid = ss_bid[:ss_bid_rn] ss_ask = ss_ask[:ss_ask_rn] snapshot = np.zeros(len(ss_bid) + len(ss_ask), event_dtype) snapshot += [cols for cols in sorted(ss_bid, key=lambda v: -float(v[4]))] snapshot += [cols for cols in sorted(ss_ask, key=lambda v: float(v[4]))] if output_filename is not None: np.savez(output_filename, data=snapshot) return snapshot
[docs] def convert( depth_filename: str, trades_filename: str, output_filename: Optional[str] = None, buffer_size: int = 100_000_000, feed_latency: float = 0, base_latency: float = 0, depth_has_header: Optional[bool] = None, trades_has_header: Optional[bool] = None ) -> NDArray: r""" Converts Binance Historical Market Data files into a format compatible with HftBacktest. Since it doesn't have a local timestamp, it lacks feed latency information, which can result in a significant discrepancy between live and backtest results. Collecting feed data yourself or obtaining the high quality of data from a data vendor is strongly recommended. https://www.binance.com/en/landing/data Args: depth_filename: Depth data filename trades_filename: Trades data filename output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format. buffer_size: Sets a preallocated row size for the buffer. feed_latency: Artificial feed latency value to be added to the exchange timestamp to create local timestamp. base_latency: The value to be added to the feed latency. See :func:`.correct_local_timestamp`. method: The method to correct reversed exchange timestamp events. See :func:`..validation.correct`. depth_has_header: True if the given file has a header, it will automatically detect it if set to None. trades_has_header: True if the given file has a header, it will automatically detect it if set to None. Returns: Converted data compatible with HftBacktest. """ tmp = np.empty(buffer_size, event_dtype) row_num = 0 timestamp_col = None side_col = None price_col = None qty_col = None print('Reading %s' % depth_filename) with open(depth_filename, 'r', newline='') as f: reader = csv.reader(f, delimiter=',') for row in reader: if timestamp_col is None: if depth_has_header is None: if row[0] == 'symbol': depth_has_header = True else: depth_has_header = False if depth_has_header: header = row else: header = [ 'symbol', 'timestamp', 'trans_id', 'first_update_id', 'last_update_id', 'side', 'update_type', 'price', 'qty' ] if len(header) != len(row): raise ValueError timestamp_col = header.index('timestamp') side_col = header.index('side') price_col = header.index('price') qty_col = header.index('qty') if depth_has_header: continue exch_ts = int(row[timestamp_col]) local_ts = exch_ts + feed_latency px = float(row[price_col]) qty = float(row[qty_col]) # Insert DEPTH_EVENT tmp[row_num] = ( DEPTH_EVENT | (BUY_EVENT if row[side_col] == 'b' else SELL_EVENT), exch_ts, local_ts, px, qty, 0, 0, 0 ) row_num += 1 timestamp_col = None side_col = None price_col = None qty_col = None print('Reading %s' % trades_filename) with open(trades_filename, 'r', newline='') as f: reader = csv.reader(f, delimiter=',') for row in reader: if timestamp_col is None: if trades_has_header is None: if row[0] == 'id': trades_has_header = True else: trades_has_header = False if trades_has_header: header = row else: header = [ 'id', 'price', 'qty', 'quote_qty', 'time', 'is_buyer_maker' ] if len(header) != len(row): raise ValueError timestamp_col = header.index('time') side_col = header.index('is_buyer_maker') price_col = header.index('price') qty_col = header.index('qty') if trades_has_header: continue exch_ts = int(row[timestamp_col]) local_ts = exch_ts + feed_latency px = float(row[price_col]) qty = float(row[qty_col]) # Insert TRADE_EVENT tmp[row_num] = [ TRADE_EVENT | (SELL_EVENT if row[side_col] else BUY_EVENT), # trade initiator's side exch_ts, local_ts, px, qty, 0, 0, 0 ] row_num += 1 tmp = tmp[:row_num] print('Correcting the latency') tmp = correct_local_timestamp(tmp, base_latency) print('Correcting the event order') data = correct_event_order( tmp, np.argsort(tmp['exch_ts'], kind='mergesort'), np.argsort(tmp['local_ts'], kind='mergesort') ) validate_event_order(data) if output_filename is not None: print('Saving to %s' % output_filename) np.savez_compressed(output_filename, data=data) return data