import asyncio import time import pymysql import json import re import logging from enum import Enum from datetime import datetime from typing import Optional from traceback import format_exc # 用patchright替换playwright from patchright.async_api import async_playwright, Frame from patchright.async_api import Error as PlaywrightError logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('GMGN Holders Tag') class ChallengePlatform(Enum): """Cloudflare challenge platform types.""" JAVASCRIPT = "non-interactive" MANAGED = "managed" INTERACTIVE = "interactive" class PumpRanks: """ 从GMGN获取热门代币的前100holders,以及其标签,以及其历史战绩 """ spider_name = 'gmgn_tags' def __init__(self): self._timeout = 30 async def on_response(self, response): """ 拦截响应 数据结构 gmgn.json """ if not response.ok: return if '/v1/rank/sol/pump_ranks/1h' in response.url: logger.info(f'捕获 pump_ranks 数据接口: {response.url}') oridata = await response.body() format_data = json.loads(oridata) data = format_data['data'] completeds = data['completeds'] for c in completeds: logger.info(f'代币--> {c}') crawler_timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) address = c['address'] symbol = c['symbol'] usd_market_cap = c['usd_market_cap'] created_timestamp = c['created_timestamp'] created_timestamp = datetime.fromtimestamp(int(created_timestamp)) holder_count = c['holder_count'] top_10_holder_rate = c['top_10_holder_rate'] twitter = c.get('twitter', '无') website = c.get('website', '无') # 准备要插入的数据 data = ( address, symbol, usd_market_cap, created_timestamp, holder_count, top_10_holder_rate, twitter, website, crawler_timestamp ) # 执行插入 insert_sql = """ INSERT INTO pump_token_info ( address, symbol, usd_market_cap, created_timestamp, holder_count, top_10_holder_rate, twitter, website, crawler_timestamp ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ # self.cursor.execute(insert_sql, data) # self.connection.commit() def _get_turnstile_frame(self, page) -> Optional[Frame]: """ Get the Cloudflare turnstile frame. Returns ------- Optional[Frame] The Cloudflare turnstile frame. """ frame = page.frame( url=re.compile( "https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile" ), ) return frame async def cookies(self, page) -> Optional[str]: """The cookies from the current page.""" cookies = await page.context.cookies() if not cookies: return None for cookie in cookies: if cookie["name"] == "cf_clearance": return cookie["value"] return None async def detect_challenge(self, page) -> Optional[str]: """ Detect the Cloudflare challenge platform on the current page. Returns ------- Optional[ChallengePlatform] The Cloudflare challenge platform. """ html = await page.content() for platform in ChallengePlatform: if f"cType: '{platform.value}'" in html: return platform.value return None async def solve_challenge(self, page) -> None: """Solve the Cloudflare challenge on the current page.""" verify_button_pattern = re.compile( "Verify (I am|you are) (not a bot|(a )?human)" ) verify_button = page.get_by_role("button", name=verify_button_pattern) challenge_spinner = page.locator("#challenge-spinner") challenge_stage = page.locator("#challenge-stage") start_timestamp = datetime.now() cookies = await self.cookies(page) challenge_type = await self.detect_challenge(page) while ( cookies is None and challenge_type is not None and (datetime.now() - start_timestamp).seconds < self._timeout ): if await challenge_spinner.is_visible(): await challenge_spinner.wait_for(state="hidden") turnstile_frame = self._get_turnstile_frame(page) if await verify_button.is_visible(): await verify_button.click() await challenge_stage.wait_for(state="hidden") elif turnstile_frame is not None: await page.mouse.click(210, 290) await challenge_stage.wait_for(state="hidden") await page.wait_for_timeout(250) async def detect(self, page): """ 破解CloudFlare """ clearance_cookie = await self.cookies(page) if clearance_cookie is None: challenge_platform = await self.detect_challenge(page) if challenge_platform is None: logging.error("No Cloudflare challenge detected.") return logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...") try: await self.solve_challenge(page) except PlaywrightError as err: logging.error(err) async def run_local(self, proxy=None): async with async_playwright() as p: # 必须得是有头浏览器,否则过不了Cloudflare launch_data = { "headless": False, "proxy": proxy, "args": [ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-first-run', '--no-default-browser-check', '--disable-infobars', '--disable-extensions', '--remote-debugging-port=9222', '--disable-features=VizDisplayCompositor' ] } user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" browser = await p.chromium.launch(**launch_data) context = await browser.new_context(user_agent=user_agent) timeout = 30 context.set_default_timeout(timeout * 1000) page = await context.new_page() # 监听请求流 page.on('response', self.on_response) url = 'https://larkfive.sg.larksuite.com/wiki/Yrh5wmnEji6h4nkjjLflj8BJgrc' # 访问目标地址 await page.goto(url) # 过反爬,如果不加就是被block的状态 await page.reload() await asyncio.sleep(10) await self.detect(page) # 1小时后关闭浏览器 await page.evaluate("setTimeout(() => window.x = 5, 24 * 60 * 60 * 1000)") # 1小时后设置 window.x = 5 await page.wait_for_function("() => window.x > 0", timeout=0) async def run_aws(self): """ 在AWS服务器启动 """ # proxy = self.proxy from pyvirtualdisplay import Display with Display(): try: await self.run_local() except: logger.error(f'浏览器异常:{format_exc()}') def task(self): if env == 'local': asyncio.run(self.run_local()) else: asyncio.run(self.run_aws()) def run(self): while True: self.task() logger.info('浏览器等待下一次启动') time.sleep(60) if __name__ == '__main__': env = 'local' PumpRanks().run() For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)