DEV Community

Cover image for Получение донатов DonationAlerts в реальном времени: Руководство по WebSocket и OAuth 2.0 на Python (FastAPI)
Фёдор Пасынков
Фёдор Пасынков

Posted on

Получение донатов DonationAlerts в реальном времени: Руководство по WebSocket и OAuth 2.0 на Python (FastAPI)

Привет, разработчики! 👋

Вы когда-нибудь хотели мгновенно реагировать на получение доната через DonationAlerts? Может быть, запустить эффект на стриме, обновить дашборд или просто надежно логировать донаты без постоянных запросов к API? Опрос API (polling) работает, но он неэффективен и не обеспечивает реального времени.

DonationAlerts предлагает способ получать эти уведомления мгновенно с помощью WebSockets, но настройка включает в себя прохождение потока OAuth 2.0 и работу с их специфичным протоколом WebSocket (на базе Centrifugo).

В этом руководстве мы пройдемся по шагам:

  1. Регистрация приложения в DonationAlerts.
  2. Реализация потока авторизации OAuth 2.0 Authorization Code с использованием Python (на FastAPI).
  3. Подключение к WebSocket DonationAlerts (Centrifugo).
  4. Аутентификация, подписка и получение сообщений о донатах в реальном времени.
  5. Понимание формата данных о донатах.

Давайте откажемся от опросов и настроим получение донатов в реальном времени!

Предварительные требования

  • Аккаунт DonationAlerts (аккаунт стримера, который вы хотите отслеживать).
  • Установленный Python 3.8+.
  • Базовое знакомство с Python. Знание FastAPI полезно, но не строго обязательно для понимания концепций.
  • pip для установки пакетов.

(Мы будем использовать FastAPI для веб-сервера, обрабатывающего OAuth, но основную логику взаимодействия с API и WebSocket DonationAlerts можно адаптировать для других фреймворков.)

Шаг 1: Регистрация вашего приложения в DonationAlerts 🔑

Прежде чем писать код, нам нужно сообщить DonationAlerts о нашем приложении и получить учетные данные.

  1. Перейдите на страницу приложений DonationAlerts OAuth: https://www.donationalerts.com/application/clients
  2. Войдите в систему, используя свой аккаунт стримера (Twitch, YouTube и т.д.).
  3. На странице "OAuth API Applications" нажмите кнопку "+ СОЗДАТЬ НОВОЕ ПРИЛОЖЕНИЕ".
  4. Заполните форму "Новое приложение":
    • Название приложения: Дайте ему описательное имя, например, "Мой Монитор Реального Времени". (Оно будет показано пользователям).
    • URL перенаправления (Redirect URL): Это КРИТИЧЕСКИ ВАЖНО. Введите точный URL, на который DonationAlerts отправит пользователя после того, как он авторизует ваше приложение. Для локальной разработки с нашим примером используйте: http://localhost:8000/api/auth/donationalerts/callback. При развертывании это должен быть ваш публичный URL обратного вызова.
  5. Нажмите оранжевую кнопку "СОЗДАТЬ".
  6. Найдите созданное приложение в списке. Вам понадобятся две части информации:
    • ID Приложения (App ID): Ваш уникальный идентификатор приложения. Будем называть его APP_ID.
    • API Ключ (API Key): Секрет вашего приложения. Храните его в безопасности и никогда не раскрывайте во фронтенд-коде! Будем называть его API_KEY.

Скопируйте эти два значения (APP_ID и API_KEY). Они скоро нам понадобятся.

Шаг 2: Понимание потока OAuth 2.0 🌊

Нам нужно, чтобы пользователь (стример) предоставил нашему приложению разрешение на доступ к своим данным (в частности, на просмотр профиля и подписку на события донатов), не передавая нам свой пароль. Именно для этого и нужен OAuth 2.0. Мы будем использовать поток "Authorization Code", стандартный для веб-приложений.

Вот суть:

  1. Инициация пользователем: Пользователь нажимает кнопку "Войти через DonationAlerts" в нашем приложении.
  2. Перенаправление на DonationAlerts: Наш бэкенд перенаправляет браузер пользователя на специальный URL DonationAlerts, включая наш APP_ID, запрошенные scopes (права доступа) и redirect_uri.
  3. Авторизация пользователем: Пользователь входит в DonationAlerts (если необходимо) и видит экран с запросом на одобрение разрешений, которые запросило наше приложение (oauth-user-show, oauth-donation-subscribe).
  4. Перенаправление обратно с кодом: Если одобрено, DonationAlerts перенаправляет браузер пользователя обратно на наш redirect_uri, добавляя временный code к URL.
  5. Обмен кода на токен: Наш бэкенд получает этот code, проверяет его, а затем безопасно выполняет запрос сервер-сервер к DonationAlerts, отправляя code, наш APP_ID и наш секретный API_KEY.
  6. Получение токенов: DonationAlerts проверяет все и отправляет обратно access_token (используется для вызовов API) и refresh_token (используется для получения нового access_token, когда старый истечет).

Этот access_token является ключом к взаимодействию с API DonationAlerts от имени пользователя.

Шаг 3: Реализация OAuth на Python (Пример с FastAPI) 🐍

Давайте настроим простое приложение FastAPI для обработки этого потока.

(Полный рабочий пример, включая хранение токенов, управление WebSocket и интерактивный помощник по настройке, смотрите в коде репозитория donationalerts-oauth-websocket-example.)

Конфигурация (config.py / .env)

Лучшей практикой является хранение учетных данных и настроек вне кода. Мы можем использовать файл .env и вспомогательный модуль config.py (как показано в полном примере) для их загрузки. Ваш .env будет выглядеть примерно так:

# .env APP_ID="ВАШ_APP_ID_ЗДЕСЬ" API_KEY="ВАШ_API_KEY_ЗДЕСЬ" REDIRECT_URI="http://localhost:8000/api/auth/donationalerts/callback" SESSION_SECRET_KEY="СГЕНЕРИРУЙТЕ_НАДЕЖНЫЙ_СЛУЧАЙНЫЙ_КЛЮЧ_ЗДЕСЬ" # Для безопасности сессии # Обычно фиксированные значения DA_SCOPES="oauth-user-show oauth-donation-subscribe" DA_AUTHORIZATION_URL="https://www.donationalerts.com/oauth/authorize" DA_TOKEN_URL="https://www.donationalerts.com/oauth/token" DA_API_BASE_URL="https://www.donationalerts.com/api/v1" # ... другие URL, если необходимо 
Enter fullscreen mode Exit fullscreen mode

Эндпоинты FastAPI (main.py)

Нам нужны два основных эндпоинта: один для начала входа и один для обработки обратного вызова (callback).

# main.py (Упрощенные фрагменты) import httpx import secrets import os # Для загрузки из .env from fastapi import FastAPI, Request, Depends from fastapi.responses import RedirectResponse from starlette.middleware.sessions import SessionMiddleware # Для хранения state from pydantic_settings import BaseSettings # Для загрузки конфига  # --- Загрузка конфигурации --- class Settings(BaseSettings): APP_ID: str API_KEY: str REDIRECT_URI: str SESSION_SECRET_KEY: str DA_SCOPES: str = "oauth-user-show oauth-donation-subscribe" DA_AUTHORIZATION_URL: str = "https://www.donationalerts.com/oauth/authorize" DA_TOKEN_URL: str = "https://www.donationalerts.com/oauth/token" DA_API_BASE_URL: str = "https://www.donationalerts.com/api/v1" DA_CENTRIFUGO_URL: str = "wss://centrifugo.donationalerts.com/connection/websocket" class Config: env_file = '.env' # Указываем файл .env  settings = Settings() # Загружаем настройки при старте  app = FastAPI() # ВАЖНО: SessionMiddleware необходим для временного хранения состояния OAuth app.add_middleware(SessionMiddleware, secret_key=settings.SESSION_SECRET_KEY) # --- Эндпоинт входа --- @app.get("/api/auth/donationalerts/login") async def login_donationalerts(request: Request): state = secrets.token_urlsafe(16) request.session['oauth_state'] = state # Сохраняем state для предотвращения CSRF  params = { "client_id": settings.APP_ID, "redirect_uri": settings.REDIRECT_URI, "response_type": "code", "scope": settings.DA_SCOPES, "state": state, } # Используем httpx для построения URL с параметрами  auth_request = httpx.Request('GET', settings.DA_AUTHORIZATION_URL, params=params) authorization_url = str(auth_request.url) print(f"Перенаправление на: {authorization_url}") # Для отладки  return RedirectResponse(authorization_url) # --- Эндпоинт обратного вызова (Callback) --- @app.get("/api/auth/donationalerts/callback") async def auth_donationalerts_callback(request: Request, code: str = None, state: str = None, error: str = None): if error: # Обработка ошибки авторизации от DonationAlerts  print(f"Ошибка авторизации от DonationAlerts: {error}") # В реальном приложении показать пользователю сообщение об ошибке  return {"error": error, "description": "Пользователь отклонил авторизацию или произошла ошибка."} # --- Проверка безопасности: Сверка State ---  stored_state = request.session.pop('oauth_state', None) if not state or state != stored_state: # Несовпадение state, возможная CSRF-атака!  print("Ошибка: Неверный state параметр.") return {"error": "invalid_state", "description": "Несовпадение параметра state. Возможна CSRF атака."} if not code: print("Ошибка: Отсутствует код авторизации.") return {"error": "missing_code", "description": "В параметрах ответа отсутствует код авторизации."} # --- Обмен кода на токены ---  token_data = await exchange_code_for_token(code) # См. функцию ниже  if not token_data or "access_token" not in token_data: print("Ошибка: Не удалось обменять код на токен.") return {"error": "token_exchange_failed", "description": "Не удалось получить токен доступа от DonationAlerts."} # --- Успех! Безопасно сохраняем токены ---  # В реальном приложении: сохраните token_data['access_token'], token_data['refresh_token']  # и вычислите время истечения токена (token_data['expires_in']) в базе данных, связанной с пользователем.  # Для этого примера мы могли бы использовать простое файловое хранилище (например, token_storage.py).  # save_token(token_data) # Как в token_storage.py из полного примера  print("Авторизация OAuth прошла успешно, токены получены!") print(f"Access Token: {token_data['access_token'][:10]}...") # Не логируйте полный токен  print(f"Refresh Token: {token_data.get('refresh_token', 'N/A')[:10]}...") print(f"Expires in: {token_data.get('expires_in')} seconds") # TODO: Сохранить токены (например, в файл или базу данных)  # store_tokens(token_data['access_token'], token_data['refresh_token'], token_data['expires_in'])  # Перенаправляем пользователя обратно на главную страницу вашего приложения  # status_code=303 (See Other) рекомендуется после POST или для перенаправления после завершения действия  return RedirectResponse(url="/?status=success", status_code=303) # --- Вспомогательная функция: Обмен кода на токен --- async def exchange_code_for_token(code: str) -> dict | None: data = { "grant_type": "authorization_code", "client_id": settings.APP_ID, "client_secret": settings.API_KEY, # Используем API Key как client_secret  "code": code, "redirect_uri": settings.REDIRECT_URI, } async with httpx.AsyncClient() as client: try: print(f"Отправка запроса на обмен токена на {settings.DA_TOKEN_URL}") response = await client.post(settings.DA_TOKEN_URL, data=data) response.raise_for_status() # Проверяем наличие HTTP ошибок (4xx, 5xx)  token_info = response.json() print("Обмен токена прошел успешно!") return token_info except httpx.HTTPStatusError as e: # Логируем ошибку, если DonationAlerts вернул статус ошибки  print(f"Ошибка при обмене токена: Статус {e.response.status_code}") try: # Попытка прочитать тело ответа для получения деталей ошибки  error_details = e.response.json() print(f"Тело ответа ошибки: {error_details}") except Exception: print(f"Тело ответа ошибки (не JSON): {e.response.text}") return None except httpx.RequestError as e: # Логируем ошибки сети или соединения  print(f"Ошибка сети при запросе обмена токена: {e}") return None except Exception as e: # Логируем другие непредвиденные ошибки  print(f"Произошла непредвиденная ошибка при обмене токена: {e}") return None 
Enter fullscreen mode Exit fullscreen mode

Этот код обрабатывает основной поток OAuth. Не забудьте безопасно хранить полученные access_token и refresh_token.

Шаг 4: Подключение к WebSocket (Centrifugo) 🕸️

Теперь, когда у нас есть access_token, мы можем подключиться к WebSocket для получения событий в реальном времени. Здесь становится немного сложнее, так как задействованы три разных типа токенов:

  1. access_token OAuth: Тот, который мы только что получили. Необходим для вызовов API.
  2. socket_connection_token: Временный токен, предназначенный специально для аутентификации самого WebSocket-соединения. Мы получаем его из эндпоинта API /api/v1/user/oauth, используя наш access_token.
  3. subscription_token: Еще один временный токен, необходимый для подписки на определенный приватный канал (например, канал донатов) после аутентификации WebSocket. Мы получаем его из эндпоинта /api/v1/centrifuge/subscribe, используя наш access_token и client_id, предоставленный WebSocket при аутентификации.

Последовательность подключения:

Вот последовательность, реализованная в donationalerts_client.py нашего примера:

# donationalerts_client.py (Упрощенные фрагменты) import websockets import json import asyncio import httpx import time from datetime import datetime, timedelta # Предполагается, что 'settings' загружены (как в main.py), # и у нас есть валидный 'access_token' (полученный и сохраненный ранее). # Также предполагается наличие функции `_make_api_request` для запросов к API # и функции для управления/обновления токенов (get_valid_access_token, refresh_tokens).  # --- Глобальные переменные для управления состоянием --- current_access_token: str | None = None current_refresh_token: str | None = None token_expires_at: datetime | None = None # TODO: Реализовать функции загрузки/сохранения токенов (например, в файл) # def load_tokens(): ... # def save_tokens(access, refresh, expires_in): ...  async def get_valid_access_token() -> str | None: """Возвращает действительный access_token, при необходимости обновляя его.""" global current_access_token, token_expires_at if not current_access_token or not token_expires_at: # Попытка загрузить токены, если их нет в памяти  # loaded = load_tokens() # Реализация зависит от способа хранения  # if not loaded:  print("Нет сохраненных токенов.") return None # Проверяем, не истек ли токен (с небольшим запасом, например, 5 минут)  if datetime.now() >= (token_expires_at - timedelta(minutes=5)): print("Access token истек или скоро истечет, пытаемся обновить...") refreshed = await refresh_tokens() if not refreshed: print("Не удалось обновить токен.") return None # Обновляем глобальные переменные после успешного обновления  # (предполагается, что refresh_tokens обновляет их)  print("Токен успешно обновлен.") return current_access_token async def refresh_tokens() -> bool: """Обновляет access_token, используя refresh_token.""" global current_access_token, current_refresh_token, token_expires_at, settings if not current_refresh_token: print("Нет refresh token для обновления.") return False data = { "grant_type": "refresh_token", "refresh_token": current_refresh_token, "client_id": settings.APP_ID, "client_secret": settings.API_KEY, } async with httpx.AsyncClient() as client: try: print(f"Запрос на обновление токена на {settings.DA_TOKEN_URL}") response = await client.post(settings.DA_TOKEN_URL, data=data) response.raise_for_status() new_token_data = response.json() # --- Обновляем глобальные переменные и сохраняем ---  current_access_token = new_token_data['access_token'] # Некоторые API могут не возвращать refresh_token при обновлении  current_refresh_token = new_token_data.get('refresh_token', current_refresh_token) expires_in = new_token_data['expires_in'] token_expires_at = datetime.now() + timedelta(seconds=expires_in) # save_tokens(current_access_token, current_refresh_token, expires_in) # Сохраняем обновленные  print(f"Токен обновлен. Новый Access Token: {current_access_token[:10]}...") return True except httpx.HTTPStatusError as e: print(f"Ошибка при обновлении токена: Статус {e.response.status_code}") try: print(f"Тело ошибки: {e.response.json()}") except Exception: print(f"Тело ошибки (не JSON): {e.response.text}") # Если ошибка 400/401 с refresh_token, возможно, он отозван, требуется повторная авторизация  if e.response.status_code in [400, 401]: print("Refresh token недействителен. Требуется повторная авторизация пользователя.") # Сбросить токены, чтобы запросить новую авторизацию  current_access_token = None current_refresh_token = None token_expires_at = None # remove_saved_tokens()  return False except Exception as e: print(f"Непредвиденная ошибка при обновлении токена: {e}") return False async def _make_api_request(method: str, endpoint: str, token: str | None = None, **kwargs) -> dict | None: """Вспомогательная функция для выполнения запросов к API DonationAlerts.""" global settings access_token = token or await get_valid_access_token() if not access_token: print("Нет действительного access_token для выполнения запроса.") return None headers = {"Authorization": f"Bearer {access_token}"} url = f"{settings.DA_API_BASE_URL}{endpoint}" async with httpx.AsyncClient() as client: try: print(f"Выполнение {method} запроса к {url}") response = await client.request(method, url, headers=headers, **kwargs) # Ошибки 4xx/5xx вызовут исключение  response.raise_for_status() # Успешные ответы без тела (204 No Content)  if response.status_code == 204: return {} # Возвращаем пустой dict для согласованности  return response.json() except httpx.HTTPStatusError as e: print(f"Ошибка API запроса {method} {endpoint}: Статус {e.response.status_code}") try: print(f"Тело ошибки: {e.response.json()}") except Exception: print(f"Тело ошибки (не JSON): {e.response.text}") # Если ошибка 401 Unauthorized, возможно токен протух несмотря на проверку  if e.response.status_code == 401: print("Получена ошибка 401 Unauthorized. Возможно, токен доступа недействителен.") # Можно попробовать обновить токен и повторить запрос, но это усложнит логику.  # Пока просто возвращаем None.  return None except httpx.RequestError as e: print(f"Ошибка сети при запросе к {url}: {e}") return None except Exception as e: print(f"Непредвиденная ошибка при запросе к API: {e}") return None async def connect_and_listen(): """Основная функция подключения к WebSocket и прослушивания сообщений.""" global settings reconnect_delay = 5 # Начальная задержка перед переподключением (в секундах)  while True: # Цикл для автоматического переподключения  access_token = await get_valid_access_token() if not access_token: print("Не удалось получить действительный токен доступа. Невозможно подключиться к WebSocket.") print(f"Повторная попытка через {reconnect_delay} секунд...") await asyncio.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 2, 60) # Удваиваем задержку, максимум 60с  continue ws: websockets.WebSocketClientProtocol | None = None # Для finally блока  try: # --- Этап 1: Получение User ID и Socket Token (HTTP API) ---  print("Получение информации о пользователе и токена для WebSocket...") user_info_response = await _make_api_request('GET', '/user/oauth', token=access_token) if not user_info_response or 'data' not in user_info_response: print("Не удалось получить информацию о пользователе или ответ API не содержит 'data'.") # Не увеличиваем задержку, т.к. это может быть временная проблема API  await asyncio.sleep(reconnect_delay) continue # Переходим к следующей итерации цикла переподключения  user_id = user_info_response['data'].get('id') socket_token = user_info_response['data'].get('socket_connection_token') if not user_id or not socket_token: print("В ответе API отсутствует user_id или socket_connection_token.") await asyncio.sleep(reconnect_delay) continue print(f"Получен User ID: {user_id}, Socket Token: {socket_token[:10]}...") # Не логируем полный токен  # --- Этап 2: Подключение и Аутентификация WebSocket ---  websocket_url = settings.DA_CENTRIFUGO_URL print(f"Подключение к WebSocket: {websocket_url}") # Устанавливаем таймаут на пинги, чтобы обнаруживать "мертвые" соединения  ws = await websockets.connect(websocket_url, ping_interval=30, ping_timeout=10) print("Успешно подключено к WebSocket.") reconnect_delay = 5 # Сбрасываем задержку при успешном подключении  # --- Этап 2a: Отправка Socket Token ---  # ВАЖНО: Формат специфичен! {"params": {"token": ...}, "id": 1}  auth_payload = {"params": {"token": socket_token}, "id": 1} await ws.send(json.dumps(auth_payload)) print("Отправлен запрос аутентификации WebSocket.") # --- Этап 2b: Получение ответа аутентификации и Client ID ---  auth_response_str = await asyncio.wait_for(ws.recv(), timeout=10.0) # Ждем ответ 10 сек  auth_response = json.loads(auth_response_str) print(f"Получен ответ аутентификации: {auth_response}") client_id = None if auth_response.get("id") == 1 and auth_response.get("result", {}).get("client"): client_id = auth_response["result"]["client"] print(f"Аутентификация WebSocket успешна! Client ID: {client_id}") else: # Подробное логирование ошибки аутентификации  error_info = auth_response.get("error") print(f"Аутентификация WebSocket не удалась! Ошибка: {error_info}") await ws.close() # Закрываем соединение  continue # Переходим к переподключению  # --- Этап 3: Получение токена подписки (HTTP API) ---  # Канал для донатов: "$alerts:donation_{user_id}"  # Можно подписаться и на другие каналы, например, "$alerts:goal_{user_id}" для целей сбора  channel_name = f"$alerts:donation_{user_id}" print(f"Запрос токена подписки для канала: {channel_name}") sub_token_payload = {"client": client_id, "channels": [channel_name]} sub_response = await _make_api_request('POST', '/centrifuge/subscribe', token=access_token, json=sub_token_payload) if not sub_response or "channels" not in sub_response: print("Не удалось получить ответ с токеном подписки или ответ не содержит 'channels'.") await ws.close() continue subscription_token = None for channel_info in sub_response.get("channels", []): if channel_info.get("channel") == channel_name and "token" in channel_info: subscription_token = channel_info["token"] break if not subscription_token: print(f"Токен подписки для канала {channel_name} не найден в ответе API.") print(f"Ответ API: {sub_response}") await ws.close() continue print(f"Получен токен подписки: {subscription_token[:10]}...") # --- Этап 4: Подписка на канал (WebSocket) ---  subscribe_payload = { "id": 2, # Используем новый ID для этого запроса  "method": 1, # 1 = subscribe  "params": {"channel": channel_name, "token": subscription_token} } await ws.send(json.dumps(subscribe_payload)) print(f"Отправлен запрос подписки на канал {channel_name}") # Ожидание подтверждения подписки (не обязательно, но полезно для отладки)  try: sub_confirm_str = await asyncio.wait_for(ws.recv(), timeout=10.0) sub_confirm = json.loads(sub_confirm_str) if sub_confirm.get("id") == 2 and "result" in sub_confirm: print(f"Подписка на канал {channel_name} подтверждена.") elif sub_confirm.get("id") == 2 and "error" in sub_confirm: print(f"Ошибка подписки на канал {channel_name}: {sub_confirm.get('error')}") await ws.close() continue else: # Это может быть уже сообщение с данными, обработаем его ниже  print(f"Получено сообщение во время ожидания подтверждения подписки: {sub_confirm_str}") handle_donation_message(sub_confirm_str) # Обрабатываем его  except asyncio.TimeoutError: print("Не получено подтверждение подписки в течение 10 секунд.") # Продолжаем слушать, возможно, подписка прошла, но ответ затерялся  # --- Этап 5: Прослушивание сообщений ---  print("Ожидание сообщений о донатах...") async for message_str in ws: # print(f"ПОЛУЧЕНО RAW СООБЩЕНИЕ: {message_str}") # Раскомментировать для детальной отладки  # Теперь парсим сообщение (см. следующий шаг)  handle_donation_message(message_str) except websockets.exceptions.ConnectionClosedOK: print("Соединение WebSocket закрыто штатно.") except websockets.exceptions.ConnectionClosedError as e: print(f"Соединение WebSocket закрыто с ошибкой: Код={e.code}, Причина='{e.reason}'") except asyncio.TimeoutError: print("Таймаут при ожидании ответа от WebSocket.") except httpx.HTTPStatusError as e: # Эти ошибки должны обрабатываться в _make_api_request, но ловим на всякий случай  print(f"Ошибка HTTP API во время установки соединения: Статус {e.response.status_code}") except Exception as e: # Ловим все остальные ошибки (сетевые, JSON и т.д.)  print(f"Произошла ошибка WebSocket: {type(e).__name__}: {e}") # Добавляем traceback для сложных ошибок  import traceback traceback.print_exc() finally: if ws and not ws.closed: await ws.close() print(f"Соединение WebSocket закрыто. Попытка переподключения через {reconnect_delay} секунд...") await asyncio.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 2, 60) # Увеличиваем задержку перед следующей попыткой  # --- Вспомогательная функция для обработки сообщений --- def handle_donation_message(message_str: str): """Парсит входящее сообщение и обрабатывает донаты.""" try: message = json.loads(message_str) # Проверяем, является ли это push-сообщением (нет 'id') и имеет ли ожидаемую структуру  if "id" not in message and "result" in message: result_data = message.get("result", {}) channel = result_data.get("channel", "") outer_data = result_data.get("data", {}) # Проверяем, что это сообщение из канала донатов  if channel.startswith("$alerts:donation_") and "data" in outer_data: donation_data = outer_data.get("data") # Это и есть полезная нагрузка доната  if donation_data and isinstance(donation_data, dict): # Извлекаем полезные поля (используем .get для безопасности)  donation_id = donation_data.get("id") # Имя отправителя: сначала username, если нет - name, иначе "Аноним"  sender = donation_data.get("username") or donation_data.get("name") or "Аноним" amount = donation_data.get("amount") currency = donation_data.get("currency") text = donation_data.get("message", "") created_at_str = donation_data.get("created_at") # "YYYY-MM-DD HH:MM:SS"  # Конвертируем строку времени в объект datetime (опционально)  created_at_dt = None if created_at_str: try: created_at_dt = datetime.strptime(created_at_str, "%Y-%m-%d %H:%M:%S") except ValueError: print(f"Не удалось распарсить дату: {created_at_str}") print("-" * 20) print(f"🎉 Новый донат! (ID: {donation_id})") print(f" От: {sender}") print(f" Сумма: {amount} {currency}") # Убираем лишние пробелы и переносы строк из сообщения  print(f" Сообщение: {text.strip() if text else '(пусто)'}") print(f" Время: {created_at_str} ({created_at_dt.strftime('%H:%M:%S') if created_at_dt else 'N/A'})") print("-" * 20) # --- TODO: Отправьте эти данные в ваш фронтенд или вызовите действия! ---  # Пример: await websocket_manager.broadcast({"type": "donation", "data": donation_data})  # Или запишите в базу данных, отправьте уведомление и т.д.  else: # Странная структура данных внутри data  print(f"Получено сообщение в канале донатов, но 'data' имеет неожиданный формат: {outer_data}") elif channel.startswith("$alerts:donation_"): # Может быть сообщение о подписке/отписке или другое системное сообщение на канале  print(f"Получено не-донатное push-сообщение на канале донатов: {message}") else: # Сообщение из другого канала (если мы на него подписаны)  print(f"Получено push-сообщение из другого канала ({channel}): {message}") elif "id" in message: # Это, вероятно, подтверждение одного из наших запросов (id: 1 или 2) или пинг/понг  # Можно добавить логику для обработки подтверждений или ошибок по ID  if "error" in message: print(f"Получена ошибка в ответ на запрос ID {message['id']}: {message['error']}") # else:  # print(f"Получено подтверждение/ответ на запрос ID {message['id']}: {message.get('result', message)}")  else: # Неизвестный формат сообщения  print(f"Получен необрабатываемый формат сообщения: {message_str}") except json.JSONDecodeError: print(f"Ошибка декодирования JSON: {message_str}") except Exception as e: # Ловим другие возможные ошибки при обработке сообщения  print(f"Ошибка при обработке сообщения: {type(e).__name__}: {e}") import traceback traceback.print_exc() # --- Запуск клиента --- # if __name__ == "__main__": # # TODO: Загрузить токены перед запуском # # load_tokens() # try: # asyncio.run(connect_and_listen()) # except KeyboardInterrupt: # print("Клиент остановлен вручную.")  
Enter fullscreen mode Exit fullscreen mode

Уф! Это самая сложная часть. Правильная реализация этой последовательности — ключ к успеху.

Шаг 5: Обработка сообщений о донатах 📨

После подписки DonationAlerts будет отправлять сообщения через WebSocket без поля id. Фактические данные о донате вложены внутрь.

Формат сообщения:

Исходя из наблюдений, типичное сообщение о донате выглядит так:

{ "result": { "channel": "$alerts:donation_ВАШ_USER_ID", "data": { "seq": 15, // Порядковый номер от Centrifugo "data": { // ---> ЭТО и есть сам объект доната <--- "id": 164405432, // Уникальный ID доната! Важно для дедупликации. "name": "ИмяДонатера", // Имя, введенное донатером (запасной вариант) "username": "ИмяПользователяDA", // Имя пользователя DA, если авторизован, иначе null "message": "Ваше сообщение здесь! \r\n Может содержать переносы строк.", "message_type": "text", // Тип сообщения (может быть 'audio') "amount": 22.0, // Сумма в валюте доната "currency": "USD", // Валюта доната "is_shown": 1, // Был ли показан в стандартных виджетах DA (0 или 1) "amount_in_user_currency": 1500.0, // Сумма в основной валюте стримера "recipient_name": "ИмяВашегоСтримера", // Публичное имя получателя "recipient": { // Информация о получателе (вас) "user_id": ВАШ_USER_ID, // Ваш ID пользователя в DA "code": "ВашЛогинСтримера", // Ваш логин/идентификатор канала "name": "ИмяВашегоСтримера", // Ваше публичное имя "avatar": "URL_к_аватару" // URL вашего аватара }, "created_at": "2025-04-12 07:15:49", // Время создания доната на сервере (UTC?) "updated_at": "2025-04-12 07:15:55", // Время последнего обновления (например, показа) "shown_at": "2025-04-12 07:15:55", // Время показа (если is_shown=1) "reason": "Donation", // Причина (может отличаться для других типов событий) "billing_system": "unitpay", // Платежная система (если доступно) "billing_system_type": "card" // Тип платежа (если доступно) // ... потенциально другие поля ... } } } } 
Enter fullscreen mode Exit fullscreen mode

Парсинг на Python:

(См. функцию handle_donation_message в коде выше. Она уже включает парсинг и извлечение основных полей.)

# def handle_donation_message(message_str): # ... (код парсинга из предыдущего блока) ... 
Enter fullscreen mode Exit fullscreen mode

Не забывайте обрабатывать потенциально отсутствующие поля с помощью .get(). Поле id критически важно, если вы хотите предотвратить обработку одного и того же доната несколько раз (DonationAlerts иногда может присылать дубликаты, особенно при переподключениях).

(Опционально) Шаг 6: Отображение донатов 🖥️

Приведенный выше код Python просто выводит донат в консоль. Чтобы показать его в веб-интерфейсе, вам обычно потребуется еще одно WebSocket-соединение, но уже между вашим бэкендом и вашим фронтендом.

  1. Бэкенд (websocket_manager.py / main.py): Когда handle_donation_message обрабатывает донат, вместо print используйте WebSocketManager (или аналогичный механизм), чтобы отправить (broadcast) donation_data всем подключенным фронтенд-клиентам.
  2. Фронтенд (index.html / JavaScript): Установите WebSocket-соединение с вашим бэкендом (например, по адресу /ws). Слушайте сообщения. Когда приходит сообщение с type: "donation", используйте JavaScript, чтобы распарсить data и добавить его в HTML.
// Упрощенный пример JS для фронтенда // Подключаемся к WebSocket-эндпоинту вашего бэкенда const wsUrl = `ws://${window.location.host}/ws`; // Или wss:// для HTTPS console.log(`Подключение к WebSocket: ${wsUrl}`); let ws = new WebSocket(wsUrl); ws.onopen = function(event) { console.log("WebSocket соединение с бэкендом установлено."); // Можно отправить сообщение бэкенду при подключении, если нужно // ws.send(JSON.stringify({type: "hello", client: "web-ui"})); }; ws.onmessage = function(event) { try { const message = JSON.parse(event.data); console.log("Сообщение от бэкенда:", message); // Для отладки if (message.type === 'donation' && message.data) { // Функция для добавления доната на страницу displayDonation(message.data); } else { console.log("Получено сообщение другого типа:", message.type); } } catch (e) { console.error("Ошибка обработки сообщения от бэкенда:", e, event.data); } }; ws.onerror = function(event) { console.error("Ошибка WebSocket:", event); }; ws.onclose = function(event) { console.log(`WebSocket соединение закрыто: Код=${event.code}, Причина='${event.reason}', Чисто=${event.wasClean}`); // Здесь можно реализовать логику переподключения для фронтенда, если необходимо // setTimeout(reconnectWebSocket, 5000); // Попытка переподключения через 5 секунд }; function displayDonation(donation) { const log = document.getElementById('donations-log'); // Убедитесь, что у вас есть <div id="donations-log"></div> в HTML if (!log) return; // Выход, если элемент не найден const item = document.createElement('div'); item.classList.add('donation-item'); // Добавим класс для стилизации // Используем имя пользователя, если есть, иначе имя, иначе "Аноним" const sender = escapeHtml(donation.username || donation.name || 'Аноним'); const amount = escapeHtml(String(donation.amount)); // Преобразуем в строку перед экранированием const currency = escapeHtml(donation.currency); // Экранируем сообщение и заменяем переносы строк на <br> const messageText = donation.message ? escapeHtml(donation.message).replace(/\r?\n/g, '<br>') : ''; // Формируем HTML для отображения item.innerHTML = ` <span class="sender"><strong>${sender}</strong></span> <span class="amount">задонатил ${amount} ${currency}</span> ${messageText ? `<p class="message">${messageText}</p>` : ''} <span class="timestamp">(${donation.created_at || 'время неизвестно'})</span> `; // Добавляем новый донат в начало списка (или в конец, если нужно) log.prepend(item); // Или log.appendChild(item); // Опционально: ограничить количество отображаемых донатов const maxItems = 50; while (log.children.length > maxItems) { log.removeChild(log.lastChild); } } // Простая функция для экранирования HTML, чтобы предотвратить XSS function escapeHtml(unsafe) { if (unsafe === null || typeof unsafe === 'undefined') return ''; return String(unsafe) // Убедимся, что это строка .replace(/&/g, "&amp;") .replace(/</g, "&lt;") .replace(/>/g, "&gt;") .replace(/"/g, "&quot;") .replace(/'/g, "&#039;"); } // Функция для переподключения (если нужна) // function reconnectWebSocket() { // console.log("Попытка переподключения WebSocket..."); // ws = new WebSocket(wsUrl); // // Переназначаем обработчики событий для нового объекта ws // ws.onopen = ... // ws.onmessage = ... // ws.onerror = ... // ws.onclose = ... // } 
Enter fullscreen mode Exit fullscreen mode

Заключение ✨

Мы рассмотрели основные шаги для получения уведомлений от DonationAlerts в реальном времени с использованием OAuth 2.0 и WebSockets на Python. Это включает настройку приложения, обработку многоэтапного процесса подключения OAuth и WebSocket (включая эти специфичные токены!) и парсинг входящих сообщений о донатах.

Хотя настройка более сложна, чем простой опрос или старый метод с токеном виджета, она дает вам стандартный, надежный способ прямой интеграции с потоком событий DonationAlerts.

Top comments (1)

Collapse
 
extressar_016ed25aa478446 profile image
Extressar • Edited

Спасибо за разбор ибо у ДА довольно запутаная дока и процесс подписки на донаты.
Есть вопрос, а реально ли socket_connection_token временный? Если он постоянный то появляется возможность единожды определять пользователя, а после подключатся к донатам без допольнительных запросов к апи.