Source code for hftbacktest.data.validation

import sys

import numpy as np
from numba import njit
from numpy.typing import NDArray

from ..types import (
    EXCH_EVENT,
    LOCAL_EVENT,
    event_dtype,
    EVENT_ARRAY
)


[docs] @njit def correct_local_timestamp(data: EVENT_ARRAY, base_latency: float) -> EVENT_ARRAY: """ Adjusts the local timestamp `in place` if the feed latency is negative by offsetting it by the maximum negative latency value as follows: .. code-block:: feed_latency = local_timestamp - exch_timestamp adjusted_local_timestamp = local_timestamp + min(feed_latency, 0) + base_latency Args: data: Data to be corrected. base_latency: Due to discrepancies in system time between the exchange and the local machine, latency may be measured inaccurately, resulting in negative latency values. The conversion process automatically adjusts for positive latency but may still produce zero latency cases. By adding ``base_latency``, more realistic values can be obtained. Unit should be the same as the feed data's timestamp unit. Returns: Data with the corrected timestamps. """ latency = sys.maxsize for row_num in range(len(data)): exch_timestamp = data[row_num].exch_ts local_timestamp = data[row_num].local_ts latency = min(latency, local_timestamp - exch_timestamp) if latency < 0: local_timestamp_offset = -latency + base_latency print('local_timestamp is ahead of exch_timestamp by', -latency) for row_num in range(len(data)): data[row_num].local_ts += local_timestamp_offset return data
[docs] @njit def correct_event_order( data: EVENT_ARRAY, sorted_exch_index: NDArray, sorted_local_index: NDArray, ) -> EVENT_ARRAY: """ Corrects exchange timestamps that are reversed by splitting each row into separate events. These events are then ordered by both exchange and local timestamps through duplication. See the `data <https://hftbacktest.readthedocs.io/en/latest/data.html>`_ for details. Args: data: Data to be corrected. sorted_exch_index: Index of data sorted by exchange timestamp. sorted_local_index: Index of data sorted by local timestamp. Returns: Data with the corrected event order. """ sorted_final = np.zeros(data.shape[0] * 2, event_dtype) out_rn = 0 exch_rn = 0 local_rn = 0 while True: sorted_exch = data[sorted_exch_index[exch_rn]] sorted_local = data[sorted_local_index[local_rn]] if ( exch_rn < len(data) and local_rn < len(data) and sorted_exch.exch_ts == sorted_local.exch_ts and sorted_exch.local_ts == sorted_local.local_ts ): assert sorted_exch.ev == sorted_local.ev assert (sorted_exch.px == sorted_local.px) or (np.isnan(sorted_exch.px) and np.isnan(sorted_local.px)) assert sorted_exch.qty == sorted_local.qty sorted_final[out_rn] = sorted_exch sorted_final[out_rn].ev = sorted_final[out_rn].ev | EXCH_EVENT | LOCAL_EVENT out_rn += 1 exch_rn += 1 local_rn += 1 elif (( exch_rn < len(data) and local_rn < len(data) and sorted_exch.exch_ts == sorted_local.exch_ts and sorted_exch.local_ts < sorted_local.local_ts ) or ( exch_rn < len(data) and sorted_exch.exch_ts < sorted_local.exch_ts )): # exchange sorted_final[out_rn] = sorted_exch sorted_final[out_rn].ev = sorted_final[out_rn].ev | EXCH_EVENT out_rn += 1 exch_rn += 1 elif (( exch_rn < len(data) and local_rn < len(data) and sorted_exch.exch_ts == sorted_local.exch_ts and sorted_exch.local_ts > sorted_local.local_ts ) or ( local_rn < len(data) )): # local sorted_final[out_rn] = sorted_local sorted_final[out_rn].ev = sorted_final[out_rn].ev | LOCAL_EVENT out_rn += 1 local_rn += 1 elif exch_rn < len(data): # exchange sorted_final[out_rn] = sorted_exch sorted_final[out_rn].ev = sorted_final[out_rn].ev | EXCH_EVENT out_rn += 1 exch_rn += 1 else: assert exch_rn == len(data) assert local_rn == len(data) break return sorted_final[:out_rn]
[docs] def validate_event_order(data: EVENT_ARRAY) -> None: """ Validates that the order of events is correct. If the data contains an incorrect event order, a :class:`ValueError` will be raised. Args: data: Data to validate. """ exch_ev = data['ev'] & EXCH_EVENT == EXCH_EVENT local_ev = data['ev'] & LOCAL_EVENT == LOCAL_EVENT if np.sum(np.diff(data['exch_ts'][exch_ev]) < 0) > 0: raise ValueError('exchange events are out of order.') if np.sum(np.diff(data['local_ts'][local_ev]) < 0) > 0: raise ValueError('local events are out of order.')