Обробка даних в реальному часі та панель приладів з Python у 2026

Автор Tuvze, ер. 06, 2026, 02:48 PM

« попередня та - наступна тема »

Tuvze

Вступ
У 2026 році додатки, орієнтовані на дані, вже не можуть бути пакетними, вони мусять працювати в реальному часі. У цьому посібнику ми створимо з нуля Панель приладів для відстеження цін на криптовалюти в реальному часі:
  • FastAPI backend: Трансляція цін у реальному часі через WebSocket
  • Redis Pub/Sub: Керування потоком даних
  • Streamlit frontend: Інтерактивна панель приладів, що оновлюється миттєво
  • Plotly для професійних графіків
  • Pandas для швидкої обробки даних
  • Симульоване джерело даних (у реальності можна використовувати API Binance, CoinGecko)

Проект пропонує production-ready основу: масштабовану, чисту та сучасну.

Вимоги
Python 3.11+, Redis (легко з docker: docker run -p 6379:6379 redis)
pip install fastapi uvicorn redis websockets streamlit pandas plotly httpx py
1. Структура проекту
Цитувати(Тут оригінал не має детального опису структури, але зображення показує дерево файлів:
real-time-crypto-dashboard/
├── backend/
│  ├── main.py  # FastAPI app
│  ├── ws_manager.py  # WebSocket connection manager
│  ├── price_simulator.py
├── frontend/
│  ├── dashboard.py  # Streamlit app
└── requirements.txt
)

2. Backend: FastAPI + WebSockets + Redis
backend/ws_manager.py
from fastapi import WebSocket
from typing import List
import asyncio
import json

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

backend/price_simulator.py (симуляція замість реального API – якщо бажаєте, переведіть на WebSocket Binance)
import asyncio
import random
import json
from datetime import datetime

async def simulate_price_stream(manager: ConnectionManager, redis_client):
    symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
    while True:
        for symbol in symbols:
            price = round(
                random.uniform(20000, 100000) if symbol == "BTCUSDT" else
                random.uniform(1000, 5000) if symbol == "ETHUSDT" else
                random.uniform(50, 300), 2
            )
            data = {
                "symbol": symbol,
                "price": price,
                "timestamp": datetime.utcnow().isoformat()
            }
            # Публікувати в Redis (інші сервіси можуть прослуховувати)
            await redis_client.publish("crypto_prices", json.dumps(data))
            # Пряма трансляція через WebSocket
            await manager.broadcast(json.dumps(data))
        await asyncio.sleep(1)  # Оновлення кожну 1 секунду

backend/main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
import redis.asyncio as redis
from ws_manager import ConnectionManager
from price_simulator import simulate_price_stream
import os
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()
manager = ConnectionManager()
redis_client = redis.from_url("redis://localhost:6379/0")

@app.on_event("startup")
async def startup_event():
    asyncio.create_task(simulate_price_stream(manager, redis_client))

@app.websocket("/ws/prices")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            await websocket.receive_text()  # Ми не хочемо повідомлень від клієнта, але
    except WebSocketDisconnect:
        manager.disconnect(websocket)

@app.get("/")
async def root():
    return {"message": "Backend панелі приладів криптовалют в реальному часі працює"}

3. Frontend: Streamlit Dashboard
frontend/dashboard.py
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
import asyncio
import json
import websockets
from datetime import datetime
import threading

st.set_page_config(page_title="Панель приладів криптовалют в реальному часі 2026", layout="wide")

# Зберігання даних
if "price_data" not in st.session_state:
    st.session_state.price_data = {sym: pd.DataFrame(columns=["timestamp", "price"]) for sym in ["BTCUSDT", "ETHUSDT", "SOLUSDT"]}

async def listen_websocket():
    uri = "ws://localhost:8000/ws/prices"
    async with websockets.connect(uri) as websocket:
        while True:
            try:
                message = await websocket.recv()
                data = json.loads(message)
                symbol = data["symbol"]
                price = data["price"]
                ts = pd.to_datetime(data["timestamp"])
                new_row = pd.DataFrame({"timestamp": [ts], "price": [price]})
                st.session_state.price_data[symbol] = pd.concat([st.session_state.price_data[symbol], new_row], ignore_index=True).tail(300)  # Зберігати останні 300 записів
                # Тригер перезапуску Streamlit (не thread-safe, але для простої демо)
                st.rerun()
            except Exception as e:
                print(f"Помилка WebSocket: {e}")
                await asyncio.sleep(5)

# Запуск прослуховувача WebSocket у фоновому потоці
if "ws_thread" not in st.session_state:
    st.session_state.ws_thread = threading.Thread(
        target=lambda: asyncio.run(listen_websocket()), daemon=True
    )
    st.session_state.ws_thread.start()

st.title("Панель приладів криптовалют в реальному часі (2026)")

col1, col2 = st.columns(2)

for symbol in st.session_state.price_data:
    df = st.session_state.price_data[symbol]
    if not df.empty:
        with (col1 if symbol == "BTCUSDT" else col2):
            st.subheader(f"{symbol} - Остання ціна: ${df['price'].iloc[-1]:,.2f}")
            fig = go.Figure()
            fig.add_trace(go.Scatter(
                x=df["timestamp"],
                y=df["price"],
                mode='lines+markers',
                name=symbol,
                line=dict(color='royalblue' if symbol == "BTCUSDT" else 'orange')
            ))
            fig.update_layout(
                title=f"{symbol} Останні 5 хвилин",
                xaxis_title="Час",
                yaxis_title="Ціна (USDT)",
                height=400,
                template="plotly_dark" if st.get_option("theme.base") == "dark" else "plotly"
            )
            st.plotly_chart(fig, use_container_width=True)

st.markdown("---")
st.caption("Backend: FastAPI + WebSockets + Redis | Frontend: Streamlit | Версія: 1.0.0")

4. Запуск
Термінал 1: Запустіть Redis (з docker)
Термінал 2:
cd backend
uvicorn main:app --reload --port 8000

Термінал 3:
cd frontend
streamlit run dashboard.py

5. Поради для просунутих (2026)
• Реальні дані: Інтегруйте WebSocket з ccxt або binance-python
• Замість Redis: Масштабуйте з Kafka / RabbitMQ
• Замість Streamlit: Taipy або Solara (більш професійний UI)
• Docker Compose: Підніміть весь стек однією командою
• Аутентифікація: JWT + FastAPI Users
• Деплой: Render / Railway / Fly.io (є безкоштовні рівні)