import gzip
import json
from typing import Optional, Literal
import numpy as np
from numpy.typing import NDArray
from ..validation import correct_event_order, correct_local_timestamp, validate_event_order
from ...types import (
DEPTH_EVENT,
DEPTH_CLEAR_EVENT,
DEPTH_SNAPSHOT_EVENT,
TRADE_EVENT,
BUY_EVENT,
SELL_EVENT,
event_dtype
)
[docs]
def convert(
input_filename: str,
output_filename: Optional[str] = None,
opt: Literal['', 'm', 't', 'mt'] = '',
base_latency: float = 0,
combined_stream: bool = True,
buffer_size: int = 100_000_000
) -> NDArray:
r"""
Converts raw Binance Futures feed stream file into a format compatible with HftBacktest.
If you encounter an ``IndexError`` due to an out-of-bounds, try increasing the ``buffer_size``.
**File Format:**
.. code-block::
local_timestamp raw_stream
1660228023037049 {"stream":"btcusdt@depth@0ms","data":{"e":"depthUpdate","E":1660228023941,"T":1660228023931,"s":"BTCUSDT","U":1801732831593,"u":1801732832589,"pu":1801732831561,"b":[["2467.10","0.000"],["12006.00","0.001"],["24427.70","4.350"],["24620.30","0.172"],["24644.00","44.832"],["24645.40","0.203"],["24652.80","4.900"],["24664.10","4.279"],["24666.50","0.554"],["24666.80","6.764"],["24668.70","7.428"],["24670.90","2.000"],["24671.00","0.000"],["24672.70","0.000"],["24688.30","0.000"]],"a":[["24653.60","0.000"],["24669.80","0.000"],["24670.20","0.000"],["24670.70","0.000"],["24670.90","0.000"],["24671.00","20.812"],["24672.10","0.000"],["24672.30","0.001"],["24674.60","1.520"],["24674.80","0.000"],["24684.20","4.519"],["24684.30","0.202"],["24685.00","0.937"],["24690.90","4.827"],["24693.60","1.500"],["24729.10","0.171"]]}}
1660228023038319 {"stream":"btcusdt@depth@0ms","data":{"e":"depthUpdate","E":1660228023977,"T":1660228023966,"s":"BTCUSDT","U":1801732832805,"u":1801732834115,"pu":1801732832589,"b":[["2467.10","0.008"],["24643.00","4.457"],["24656.30","0.010"],["24657.70","0.005"],["24658.80","1.000"],["24658.90","1.500"],["24659.50","3.781"],["24659.70","1.806"],["24659.90","0.105"],["24660.60","0.787"],["24666.30","5.033"],["24666.40","0.012"],["24666.50","0.556"],["24668.70","7.426"],["24668.90","0.000"],["24670.90","2.535"],["24680.00","0.000"],["24688.30","0.000"]],"a":[["24653.60","0.000"],["24670.10","0.000"],["24670.60","0.000"],["24670.70","0.000"],["24670.90","0.000"],["24671.00","20.642"],["24672.00","0.000"],["24672.10","0.000"],["24673.50","0.145"],["24673.60","1.567"],["24674.50","3.746"],["24674.60","1.520"],["24678.30","1.304"],["24678.40","0.001"],["24678.80","0.546"],["24678.90","0.002"],["24681.60","0.020"],["24681.70","0.613"],["24681.90","0.077"],["24682.10","3.000"],["24682.20","0.000"],["24683.70","0.163"],["24683.80","4.162"],["24684.00","1.227"],["24684.20","4.519"],["24684.30","0.202"],["24684.90","1.331"],["24685.70","0.156"],["24685.80","0.325"],["24686.70","0.648"],["24692.60","0.040"],["24700.00","47.420"],["24729.10","0.006"]]}}
1660228023043260 {"stream":"btcusdt@trade","data":{"e":"trade","E":1660228023980,"T":1660228023973,"s":"BTCUSDT","t":2691833663,"p":"24670.90","q":"0.022","X":"MARKET","m":true}}
1660228023052991 {"stream":"btcusdt@trade","data":{"e":"trade","E":1660228023991,"T":1660228023983,"s":"BTCUSDT","t":2691833664,"p":"24671.00","q":"0.001","X":"MARKET","m":false}}
1660228023071108 {"stream":"btcusdt@depth@0ms","data":{"e":"depthUpdate","E":1660228024010,"T":1660228024002,"s":"BTCUSDT","U":1801732834136,"u":1801732835323,"pu":1801732834115,"b":[["2467.10","0.000"],["12006.00","0.000"],["24599.40","0.641"],["24603.20","0.104"],["24625.50","0.152"],["24645.20","0.476"],["24646.80","0.081"],["24652.60","0.254"],["24664.10","4.279"],["24666.50","0.878"],["24668.80","0.004"],["24670.90","2.513"],["24688.30","0.000"],["24787.00","0.000"]],"a":[["24653.60","0.000"],["24668.10","0.000"],["24668.70","0.000"],["24669.50","0.000"],["24669.80","0.000"],["24670.00","0.000"],["24670.60","0.000"],["24670.70","0.000"],["24670.90","0.000"],["24671.00","20.641"],["24672.20","0.000"],["24672.30","0.001"],["24673.50","0.040"],["24673.90","0.105"],["24674.70","2.139"],["24674.80","0.000"],["24683.70","0.963"],["24683.90","0.009"],["24685.70","0.556"],["24709.30","0.254"],["24723.80","0.000"],["24728.30","0.193"],["24729.50","4.477"],["24739.40","0.807"],["24743.20","0.235"],["24795.00","0.130"]]}}
1660228023117894 {"stream":"btcusdt@depth@0ms","data":{"e":"depthUpdate","E":1660228024044,"T":1660228024034,"s":"BTCUSDT","U":1801732835406,"u":1801732836571,"pu":1801732835323,"b":[["2467.10","0.000"],["24337.10","2.462"],["24616.50","1.050"],["24619.00","0.235"],["24640.00","5.148"],["24649.80","2.805"],["24650.00","14.374"],["24651.90","3.000"],["24653.30","1.400"],["24658.70","1.142"],["24658.80","0.000"],["24659.60","3.263"],["24659.70","0.006"],["24660.50","0.840"],["24660.60","0.387"],["24662.20","0.202"],["24663.10","7.147"],["24664.00","0.922"],["24664.20","0.131"],["24664.50","0.027"],["24666.20","7.066"],["24666.40","0.012"],["24668.80","0.002"],["24669.30","0.002"],["24670.20","0.811"],["24670.90","5.817"],["24688.30","0.000"]],"a":[["24653.60","0.000"],["24669.80","0.000"],["24669.90","0.000"],["24670.90","0.000"],["24671.00","20.121"],["24672.10","0.000"],["24672.80","0.000"],["24674.60","1.520"],["24675.30","0.421"],["24681.20","0.239"],["24681.50","1.343"],["24681.60","0.020"],["24681.70","0.213"],["24683.60","2.929"],["24683.70","0.163"],["24683.80","2.162"],["24684.70","0.646"],["24684.90","0.731"],["24692.90","0.321"],["24693.10","0.040"],["24700.70","0.537"],["24703.60","0.210"],["24721.50","7.245"]]}}
1660228023125009 {"stream":"btcusdt@trade","data":{"e":"trade","E":1660228024062,"T":1660228024055,"s":"BTCUSDT","t":2691833665,"p":"24670.90","q":"0.002","X":"MARKET","m":true}}
1660228023128966 {"stream":"btcusdt@trade","data":{"e":"trade","E":1660228024067,"T":1660228024061,"s":"BTCUSDT","t":2691833666,"p":"24670.90","q":"0.020","X":"MARKET","m":true}}
1660228023138740 {"stream":"btcusdt@depth@0ms","data":{"e":"depthUpdate","E":1660228024077,"T":1660228024066,"s":"BTCUSDT","U":1801732836639,"u":1801732837803,"pu":1801732836571,"b":[["2467.10","0.000"],["24659.00","0.000"],["24659.30","2.500"],["24663.00","1.038"],["24664.20","0.118"],["24666.20","7.065"],["24666.50","0.554"],["24666.70","3.987"],["24666.80","7.088"],["24666.90","0.014"],["24667.40","1.506"],["24668.90","0.006"],["24670.10","0.272"],["24670.90","6.726"],["24688.30","0.000"]],"a":[["24653.60","0.000"],["24668.70","0.000"],["24670.30","0.000"],["24670.50","0.000"],["24670.90","0.000"],["24679.00","0.001"],["24703.10","1.500"],["24710.50","0.057"],["24728.30","0.028"],["24768.50","0.318"],["24980.10","5.446"],["25050.00","119.300"]]}}
1660228023149748 {"stream":"btcusdt@trade","data":{"e":"trade","E":1660228024088,"T":1660228024081,"s":"BTCUSDT","t":2691833667,"p":"24671.00","q":"0.063","X":"MARKET","m":false}}
Args:
input_filename: Input filename with path.
output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format.
opt: Additional processing options:
- ``m``: Processes ``markPriceUpdate`` stream with the following custom event IDs.
- index: ``100``
- mark price: ``101``
- funding rate: ``102``
- ``t``: Processes ``bookTicker`` stream with the following custom event IDs.
- best bid: ``103``
- best ask: ``104``
base_latency: The value to be added to the feed latency.
See :func:`.correct_local_timestamp`.
combined_stream: Raw stream type.
**combined stream:**
.. code-block::
{"stream":"solusdt@bookTicker","data":{"e":"bookTicker","u":4456408609867,"s":"SOLUSDT","b":"142.4440","B":"50","a":"142.4450","A":"3","T":1713571200009,"E":1713571200010}}
regular stream:
{"e":"bookTicker","u":4456408609867,"s":"SOLUSDT","b":"142.4440","B":"50","a":"142.4450","A":"3","T":1713571200009,"E":1713571200010}
buffer_size: Sets a preallocated row size for the buffer.
Returns:
Converted data compatible with HftBacktest.
"""
timestamp_slice = 19
timestamp_mul = 1000000
tmp = np.empty(buffer_size, event_dtype)
row_num = 0
with gzip.open(input_filename, 'r') as f:
while True:
line = f.readline()
if not line:
break
local_timestamp = int(line[:timestamp_slice])
message = json.loads(line[timestamp_slice + 1:])
if combined_stream:
data = message.get('data')
else:
data = message
if data is not None:
evt = data['e']
if evt == 'trade':
if data['X'] != 'MARKET':
continue
# event_time = data['E']
transaction_time = data['T']
price = data['p']
qty = data['q']
exch_timestamp = int(transaction_time) * timestamp_mul
tmp[row_num] = (
TRADE_EVENT | (SELL_EVENT if data['m'] else BUY_EVENT), # trade initiator's side
exch_timestamp,
local_timestamp,
float(price),
float(qty),
0,
0,
0
)
row_num += 1
elif evt == 'depthUpdate':
# event_time = data['E']
transaction_time = data['T']
exch_timestamp = int(transaction_time) * timestamp_mul
for px, qty in data['b']:
tmp[row_num] = (
DEPTH_EVENT | BUY_EVENT,
exch_timestamp,
local_timestamp,
float(px),
float(qty),
0,
0,
0
)
row_num += 1
for px, qty in data['a']:
tmp[row_num] = (
DEPTH_EVENT | SELL_EVENT,
exch_timestamp,
local_timestamp,
float(px),
float(qty),
0,
0,
0
)
row_num += 1
elif evt == 'markPriceUpdate' and 'm' in opt:
# event_time = data['E']
transaction_time = data['T']
index = data['i']
mark_price = data['p']
# est_settle_price = data['P']
funding_rate = data['r']
exch_timestamp = int(transaction_time) * timestamp_mul
tmp[row_num] = (
100,
exch_timestamp,
local_timestamp,
float(index),
float(0),
0,
0,
0
)
row_num += 1
tmp[row_num] = (
101,
exch_timestamp,
local_timestamp,
float(mark_price),
float(0),
0,
0,
0
)
row_num += 1
tmp[row_num] = (
102,
exch_timestamp,
local_timestamp,
float(funding_rate),
float(0),
0,
0,
0
)
row_num += 1
elif evt == 'bookTicker' and 't' in opt:
# event_time = data['E']
transaction_time = data['T']
bid_price = data['b']
bid_qty = data['B']
ask_price = data['a']
ask_qty = data['A']
exch_timestamp = int(transaction_time) * timestamp_mul
tmp[row_num] = (
103,
exch_timestamp,
local_timestamp,
float(bid_price),
float(bid_qty),
0,
0,
0
)
row_num += 1
tmp[row_num] = (
104,
exch_timestamp,
local_timestamp,
float(ask_price),
float(ask_qty),
0,
0,
0
)
row_num += 1
else:
if 'code' in message:
print(message['code'], message['msg'])
else:
# snapshot
# event_time = msg['E']
transaction_time = message['T']
bids = message['bids']
asks = message['asks']
exch_timestamp = int(transaction_time) * timestamp_mul
if len(bids) > 0:
bid_clear_upto = float(bids[-1][0])
# clears the existing market depth upto the prices in the snapshot.
tmp[row_num] = (
DEPTH_CLEAR_EVENT | BUY_EVENT,
exch_timestamp,
local_timestamp,
bid_clear_upto,
0,
0,
0,
0
)
row_num += 1
# inserts the snapshot.
for px, qty in bids:
tmp[row_num] = (
DEPTH_SNAPSHOT_EVENT | BUY_EVENT,
exch_timestamp,
local_timestamp,
float(px),
float(qty),
0,
0,
0
)
row_num += 1
if len(asks) > 0:
ask_clear_upto = float(asks[-1][0])
# clears the existing market depth upto the prices in the snapshot.
tmp[row_num] = (
DEPTH_CLEAR_EVENT | SELL_EVENT,
exch_timestamp,
local_timestamp,
ask_clear_upto,
0,
0,
0,
0
)
row_num += 1
# inserts the snapshot.
for px, qty in asks:
tmp[row_num] = (
DEPTH_SNAPSHOT_EVENT | SELL_EVENT,
exch_timestamp,
local_timestamp,
float(px),
float(qty),
0,
0,
0
)
row_num += 1
tmp = tmp[:row_num]
print('Correcting the latency')
tmp = correct_local_timestamp(tmp, base_latency)
print('Correcting the event order')
data = correct_event_order(
tmp,
np.argsort(tmp['exch_ts'], kind='mergesort'),
np.argsort(tmp['local_ts'], kind='mergesort')
)
validate_event_order(data)
if output_filename is not None:
print('Saving to %s' % output_filename)
np.savez_compressed(output_filename, data=data)
return data