Для обучения сетей и бэктеста нам потребуется исторические данные по разным инструментам. Конечно все эти данные можно найти и скачать в открытом доступе, так же на github можно найти множество различных грабберов для сбора этих данных с разных бирж. Можно не изобретать велосипед, а можно изобрести, так что пишем свой класс для сбора данных.
Что должно быть на выходе ? На выходе я хочу вызвать метод, передать в него: тип, инструмент, таймфрейм и интервал с и по какие даты и время собрать инфу. И в последствии выполнения должен сформироваться csv файл, с которым в дальнейшем я смогу работать.
Создадим новый файл LoadHistory.py и подключим нужные библиотеки
import json
import math
import pandas as pd
import requests
import time
from datetime import datetime
Создадим класс с конструктором и пропишем в нем некие константы с которыми будем взаимодействовать
class LoadHistoryData:
# ТИП: Спот или Фьючерсы
MARKET = {
"SPOT": "https://api.binance.com/api/v3",
"FUTURE": "https://fapi.binance.com/fapi/v1"
}
# ТАЙМФРЕЙМ
TF = {
"1m": 1,
"5m": 5,
"15m": 15,
"30m": 30,
"1h": 60,
"4h": 240,
"1d": 1440
}
# Переменные для работы
_limit = None
_limit_max = None
_request_count = None
_market = None
_t_d, _f_d = None, None
_t, _f, _tf, _sym = None, None, None, None
def __init__(self):
pass
# Функция загрузки данных
def load(self):
pass
Наполним конструктор смысловой нагрузкой. При создании класса будем передавать тип, это может быть либо фьючерсный рынок либо спотовый, полное наименование инструмента, таймфрейм, дата и время начала и окончания данных.
def __init__(self, market, sym, tf, f, t):
self._sym = sym
self._tf = tf
self._market = market
# Максимальное кол-во баров у спота и фьюча разное
self._limit_max = 1000 if market == 'SPOT' else 1500
# Обрабатываем и конвертируем дату начала
self._t = datetime.strptime(t, '%Y-%m-%d %H:%M:%S')
self._t_d = str(datetime.strftime(self._t, '_%Y%m%d_%H%M_'))
self._t = int(datetime.timestamp(self._t))
# Обрабатываем и конвертируем дату окончания
self._f = datetime.strptime(f, '%Y-%m-%d %H:%M:%S')
self._f_d = str(datetime.strftime(self._f, '_%Y%m%d_%H%M'))
self._f = int(datetime.timestamp(self._f))
# Рассчитываем кол-во баров необходимых для закрытия переданных дат
self._limit = int((self._t - self._f) / (self.TF[self._tf] * 60))
# Рассчитывем кол-во запросов исходя из максимального кол-ва баров
self._request_count = math.ceil(self._limit / self._limit_max)
Так же напишем код для основного метода загрузки
def load(self):
payload={}
headers = {'Content-Type': 'application/json'}
data = list()
cur_limit = self._limit_max if self._request_count > 1 else self._limit
cur_f = self._f
cur_t = self._f + cur_limit * (self.TF[self._tf] * 60)
for r in range(0, self._request_count):
url = "{}/klines?symbol={}&interval={}&limit={}&startTime={}&endTime={}".format(
self.MARKET[self._market],
self._sym,
self._tf,
cur_limit,
"{}000".format(cur_f),
"{}000".format(cur_t)
)
self._limit -= self._limit_max
cur_limit = cur_limit if self._limit >= self._limit_max else self._limit
cur_f = cur_t
cur_t = cur_t + cur_limit * (self.TF[self._tf] * 60)
response = requests.request("GET", url, headers=headers, data=payload)
if response.status_code == 200:
d = json.loads(response.text)
data.extend(d)
# Сделаем паузу, чтобы не грузить биржу
time.sleep(1.5)
# Произведем постобработку списка
for item in data:
item[0] = int(str(item[0])[:-3])
item.pop(11)
item.pop(6)
# Создадим датафрейм и присвоим имена колонок
df = pd.DataFrame(data, columns=[
'Open time',
'Open',
'High',
'Low',
'Close',
'Volume',
'Quote asset volume',
'Number of trades',
'Taker buy base asset volume',
'Taker buy quote asset volume'
])
# Сформируем имя для csv файла и сохраним его
nafullnameme = self._market[:-5] + '_' + self._sym + '_' + self._tf + '_' + self._f_d + '_' + self._t_d + '.csv'
df.to_csv(nafullnameme, index=False)
Наш класс выгрузки данных готов! У вас должен получиться следующий код
import json
import math
import pandas as pd
import requests
import time
from datetime import datetime
class LoadHistoryData:
# ТИП: Спот или Фьючерсы
MARKET = {
"SPOT": "https://api.binance.com/api/v3",
"FUTURE": "https://fapi.binance.com/fapi/v1"
}
# ТАЙМФРЕЙМ
TF = {
"1m": 1,
"5m": 5,
"15m": 15,
"30m": 30,
"1h": 60,
"4h": 240,
"1d": 1440
}
# Переменные для работы
_limit = None
_limit_max = None
_request_count = None
_market = None
_t_d, _f_d = None, None
_t, _f, _tf, _sym = None, None, None, None
def __init__(self, market, sym, tf, f, t):
self._sym = sym
self._tf = tf
self._market = market
# Максимальное кол-во баров у спота и фьюча разное
self._limit_max = 1000 if market == 'SPOT' else 1500
# Обрабатываем и конвертируем дату начала
self._t = datetime.strptime(t, '%Y-%m-%d %H:%M:%S')
self._t_d = str(datetime.strftime(self._t, '_%Y%m%d_%H%M_'))
self._t = int(datetime.timestamp(self._t))
# Обрабатываем и конвертируем дату окончания
self._f = datetime.strptime(f, '%Y-%m-%d %H:%M:%S')
self._f_d = str(datetime.strftime(self._f, '_%Y%m%d_%H%M'))
self._f = int(datetime.timestamp(self._f))
# Рассчитываем кол-во баров необходимых для закрытия переданных дат
self._limit = int((self._t - self._f) / (self.TF[self._tf] * 60))
# Рассчитывем кол-во запросов исходя из максимального кол-ва баров
self._request_count = math.ceil(self._limit / self._limit_max)
# Функция загрузки данных
def load(self):
payload={}
headers = {'Content-Type': 'application/json'}
data = list()
cur_limit = self._limit_max if self._request_count > 1 else self._limit
cur_f = self._f
cur_t = self._f + cur_limit * (self.TF[self._tf] * 60)
for r in range(0, self._request_count):
url = "{}/klines?symbol={}&interval={}&limit={}&startTime={}&endTime={}".format(
self.MARKET[self._market],
self._sym,
self._tf,
cur_limit,
"{}000".format(cur_f),
"{}000".format(cur_t)
)
self._limit -= self._limit_max
cur_limit = cur_limit if self._limit >= self._limit_max else self._limit
cur_f = cur_t
cur_t = cur_t + cur_limit * (self.TF[self._tf] * 60)
response = requests.request("GET", url, headers=headers, data=payload)
if response.status_code == 200:
d = json.loads(response.text)
data.extend(d)
# Сделаем паузу, чтобы не грузить биржу
time.sleep(1.5)
# Произведем постобработку списка
for item in data:
item[0] = int(str(item[0])[:-3])
item.pop(11)
item.pop(6)
# Создадим датафрейм и присвоим имена колонок
df = pd.DataFrame(data, columns=[
'Open time',
'Open',
'High',
'Low',
'Close',
'Volume',
'Quote asset volume',
'Number of trades',
'Taker buy base asset volume',
'Taker buy quote asset volume'
])
# Сформируем имя для csv файла и сохраним его
nafullnameme = self._market[:1] + '_' + self._sym + '_' + self._tf + '_' + self._f_d + '_' + self._t_d + '.csv'
df.to_csv(nafullnameme, index=False)
Теперь, когда все готово, можно опробовать, что у нас получилось
market='SPOT'
sym = 'BTCUSDT'
tf = '1h'
f = '2020-01-01 00:00:00'
t = '2021-12-31 23:59:59'
history = LoadHistoryData(market, sym, tf, f, t)
history.load()
Наш скрипт сделает рассчеты и выполнит нужное кол-во запросов к бирже, за процессом можно наблюдать в консоле
В конце процесса в вашей папке сформируется csv файл
Откроем его и убедимся что все данные загружены. Сам вывод может отличаться от моего, в зависимости от вашего редактора кода и установленных модулей
На этом все! В следующих статьях с помощью данного класса мы будем формировать себе датасеты для обучения своих сететей.