В прошлой статье мы создали инструмент для выгрузки и обработки данных стакана. И закрыли вопрос касаемо, где брать эти данные. Теперь, когда данные выгружены в виде своего рода датасетов, мы можем написать эмулятор, который будет транслировать эти данные.
Существует разное множество баз данных для работы с временными рядами, к примеру InfluxDB и ей подобные. Я же, решил, что мне будет удобней работать с ElasticDB.
Если посмотреть схематично на реализацию, то она будет состоять из следующих компонентов:
— Конфиг для docker-compose для деплоя Elasticsearch и Kibana (второе необязательно)
— Скрипт загрузки данных из файлов в базу
— WebSocketServer который собственно и будет транслировать наши данные из бд
— WebSockerClient для удобства и мониторинга
Первым делом задеплоим докер контейнер с Elasticsearch. В него мы загрузим все данные из файлов по иструментам, которые подготавливали в прошлой статье.
version: '3.7'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.4.0
container_name: elasticsearch
environment:
- xpack.security.enabled=false
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
cap_add:
- IPC_LOCK
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- 9200:9200
- 9300:9300
kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:7.4.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- 5601:5601
depends_on:
- elasticsearch
volumes:
elasticsearch-data:
driver: local
После того как контейнер соберется и запустится, можно загрузить в бд наши подготовленные данные, для этого создайте python скрипт со следующим содержанием.
import datetime
import os
import sys
from pprint import pprint as pp
from memory_profiler import profile
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk, bulk
from dotenv import load_dotenv
load_dotenv()
class Parser:
es = Elasticsearch(os.environ["DB"])
symbol = None
iterator = 1
bulk_data = list()
def __init__(self, symbol):
self.symbol = symbol
def add(self, data: list, send: int = 0):
self.bulk_data.append({
"_op_type": "index",
"_index": self.symbol,
"_source": data,
})
if len(self.bulk_data) == 1000 or send == 1:
bulk(self.es, self.bulk_data)
self.bulk_data = list()
self.iterator+=1
def prepare(self, fl: str):
with open(fl, "r") as f:
i = 0; _data = dict({"ts":0,"a":[],"b":[],"delay":0})
for n in f:
if i==0: i+=1; continue
if (i%1000000)==0: print(i)
nd = n.split(",")
if int(nd[0]) == _data["ts"]:
if nd[1] == 'a':
_data['a'].append( [float(nd[2]), float(nd[3])] )
elif nd[1] == 'b':
_data['b'].append( [float(nd[2]), float(nd[3])] )
elif not _data["ts"]:
if nd[1] == 'a':
_data['a'].append( [float(nd[2]), float(nd[3])] )
elif nd[1] == 'b':
_data['b'].append( [float(nd[2]), float(nd[3])] )
_data["ts"] = int(nd[0])
_data['delay'] = float(nd[4].replace("\n", ""))
else:
self.add(_data)
_data = dict({"ts":0,"a":[],"b":[],"delay":0})
i+=1
if _data: self.add(_data,1); del _data
name = os.environ["SYMBOL"]
p = Parser(name)
directory = os.environ["PATH_TO_FILES"]
for n in os.listdir(directory):
nd = os.path.join(directory,n)
if os.path.isdir(nd) and name.upper() in n:
for f in os.listdir(nd):
print(nd + "/"+ f)
s = datetime.datetime.now()
p.prepare(nd + "/"+ f)
delay = datetime.datetime.now() - s
print("Time execute: " + str(delay))
print("Rows in DB: " + str(p.iterator))
И сразу же файл конфига
DB="http://localhost:9200"
PATH_TO_FILES="Полный путь до директории с файлами"
SYMBOL="Наименование инструмента, к примеру atomusdt"
WS_URL="ws://localhost:5678"
WS_HOST="127.0.0.1"
WS_PORT="5678"
EMU_SYMBOL="atomusdt"
EMU_FROM="Дата и время с которого будет трансляция к примеру 01.01.2022 10:29:20.221"
Теперь это можно запустить, и загрузить все файлы в бд. Это может занят достаточно продолжительное время, поэтому наберитесь терпения.
Следующим шагом будет реализация вебсокет сервера, это сам механизм отвечающий за трансляцию данных.
from elasticsearch import Elasticsearch
from datetime import datetime, timezone
from pprint import pprint as pp
import asyncio
import random
import websockets
import json
import sys
import threading
from dotenv import load_dotenv
load_dotenv()
es = Elasticsearch()
class Emulator:
data = None
ts_from = None
index = None
delta = 60
def __init__(self,symbol, dt):
self.ts_from = int(datetime.strptime(dt, "%d.%m.%Y %H:%M:%S.%f").replace(tzinfo=timezone.utc).timestamp() * 1000)
self.index = symbol
self._getData()
def _ts_to(self, ts_from):
return ts_from + self.delta * 1000
def _getData(self):
q = {
"query": {
"range": {
"ts": {
"gte": self.ts_from,
"lt": self._ts_to(self.ts_from)
}
}
},
"size": 1000,
}
resp = es.search(index=self.index, body=q)
if resp['hits']['total']['value'] > 0:
self.data = [hit['_source'] for hit in resp['hits']['hits']]
print("Got %d Hits" % resp['hits']['total']['value'])
print("In self.data: %d"% len(self.data))
def _append(self, ts_from):
q = {
"query": {
"range": {
"ts": {
"gte": ts_from,
"lt": self._ts_to(ts_from)
}
}
},
"size": 1000,
}
resp = es.search(index=self.index, body=q)
if resp['hits']['total']['value'] > 0:
self.data.extend([hit['_source'] for hit in resp['hits']['hits']])
print("Got %d Hits" % resp['hits']['total']['value'])
print("In self.data after append: %d"% len(self.data))
async def translation(self, websocket, path):
while True:
if self.data:
current = self.data.pop(0)
sys.stdout.write("\rCurrent length: %d "% len(self.data))
sys.stdout.flush()
await websocket.send(json.dumps(current))
if len(self.data) < 500:
threading.Thread(target=self._append, args=[self.data[len(self.data)-1]["ts"]]).start()
await asyncio.sleep(int(current['delay'])/1000)
def run(self):
start_server = websockets.serve(self.translation, '127.0.0.1', 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
em = Emulator("atomusdt","01.01.2022 10:29:20.221")
em.run()
Для начала работы его достаточно запустить, особых настроек у него нет. Стоит упомянуть, что как таковых данных в бд большое множество и было бы не целесообразно выгружать все, поэтому в данном коде предусмотрена динамическая подгрузка данных при достижении определенного лимита, а так же реализована задержка, чтобы максимально приблизиться к боевым условиям.
После запуска websocket сервера, при подключении к нему клиента, он начнет транслировать аск и бид цены, для подключения можно использоваться сторонние программы для работы с сокетами, либо проверить через плагины для хром браузера.
Но для полноты картины напишем клиент для сервера.
import asyncio
import websockets
from pprint import pprint as pp
import json
import os
import sys
from collections import OrderedDict
from dotenv import load_dotenv
load_dotenv()
class WSClient:
asks = dict()
bids = dict()
best_bid = None
best_ask = None
async def connect(self):
uri = os.environ['WS_URL']
async with websockets.connect(uri) as websocket:
while 1:
obj = await websocket.recv()
obj = json.loads(obj)
best_bid = .0
for n in obj['b']:
if best_bid < n[0] and n[1] > .0:
best_bid = n[0]
self.best_bid = best_bid
best_ask = 999999999
for n in obj['a']:
if best_ask > n[0] and n[1] > .0:
best_ask = n[0]
self.best_ask = best_ask
self._pp()
def _pp(self):
sys.stdout.write("\rASK: %f BID: %f SPREAD: %f\t\t\t"% (self.best_ask,self.best_bid, (self.best_ask-self.best_bid)))
sys.stdout.flush()
try:
ws = WSClient()
asyncio.run(ws.connect())
except KeyboardInterrupt:
print("\n Stop")
Теперь запустив клиент и сервер, можно посмотреть как работает наша сборка
Или проверив работу запустив клиента в виде плагина chrome
Репозиторий на github