DEV Community

drake
drake

Posted on • Edited on

boss职位监控

import asyncio import re from enum import Enum from typing import Optional # 用patchright替换playwright from patchright.async_api import async_playwright, Frame from patchright.async_api import Error as PlaywrightError from utils.redisdb import redis_cli from config import env import logging import json from datetime import datetime from retry import retry from utils.lark_bot import sender from other_spider.scheduler import scheduled_task logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('Boss Anouncement') class ChallengePlatform(Enum): """Cloudflare challenge platform types.""" JAVASCRIPT = "non-interactive" MANAGED = "managed" INTERACTIVE = "interactive" class BossAlert: """ 币安上币公告告警到研究院的群里 """ spider_name = 'BinanceAlert To Research' author = 'drake.shi' def __init__(self): self.redis_cli = redis_cli() self.black_list_key = 'binance:listing:black' # 添加代理配置  self.PROXY_URL = 'h5' # 解析代理URL  self.proxy = self._parse_proxy_url(self.PROXY_URL) self._timeout = 30 self.api = 'https://www.zhipin.com/web/geek/jobs?city={}&query=%E5%90%88%E7%BA%A6%E5%B7%A5%E7%A8%8B%E5%B8%88' self.lark_hook = 'https://open.larksuite.com/open-apis/bot/v2/hook/141f' self.codes = [100010000, 101010100, 101020100, 101280100, 101280600, 101210100, 101030100, 101110100, 101190400, 101200100, 101230200, 101250100, 101270100, 101180100, 101040100] def _parse_proxy_url(self, proxy_url): """ 解析代理URL为playwright需要的格式 """ if not proxy_url: return None # 解析 http://username:password@host:port 格式  if proxy_url.startswith('http://'): proxy_url = proxy_url[7:] # 移除 http://  if '@' in proxy_url: auth_part, server_part = proxy_url.split('@', 1) username, password = auth_part.split(':', 1) server, port = server_part.split(':', 1) return { "server": f"http://{server}:{port}", "username": username, "password": password } else: # 没有认证信息的代理  server, port = proxy_url.split(':', 1) return { "server": f"http://{server}:{port}" } def _get_turnstile_frame(self, page) -> Optional[Frame]: """ Get the Cloudflare turnstile frame. Returns ------- Optional[Frame] The Cloudflare turnstile frame. """ frame = page.frame( url=re.compile( "https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile" ), ) return frame async def cookies(self, page) -> Optional[str]: """The cookies from the current page.""" cookies = await page.context.cookies() if not cookies: return None for cookie in cookies: if cookie["name"] == "cf_clearance": return cookie["value"] return None async def detect_challenge(self, page) -> Optional[str]: """ Detect the Cloudflare challenge platform on the current page. Returns ------- Optional[ChallengePlatform] The Cloudflare challenge platform. """ html = await page.content() for platform in ChallengePlatform: if f"cType: '{platform.value}'" in html: return platform.value return None async def solve_challenge(self, page) -> None: """Solve the Cloudflare challenge on the current page.""" verify_button_pattern = re.compile( "Verify (I am|you are) (not a bot|(a )?human)" ) verify_button = page.get_by_role("button", name=verify_button_pattern) challenge_spinner = page.locator("#challenge-spinner") challenge_stage = page.locator("#challenge-stage") start_timestamp = datetime.now() cookies = await self.cookies(page) challenge_type = await self.detect_challenge(page) while ( cookies is None and challenge_type is not None and (datetime.now() - start_timestamp).seconds < self._timeout ): if await challenge_spinner.is_visible(): await challenge_spinner.wait_for(state="hidden") turnstile_frame = self._get_turnstile_frame(page) if await verify_button.is_visible(): await verify_button.click() await challenge_stage.wait_for(state="hidden") elif turnstile_frame is not None: await page.mouse.click(210, 290) await challenge_stage.wait_for(state="hidden") await page.wait_for_timeout(250) async def detect(self, page): """ 破解CloudFlare """ clearance_cookie = await self.cookies(page) if clearance_cookie is None: challenge_platform = await self.detect_challenge(page) if challenge_platform is None: logging.error("No Cloudflare challenge detected.") return logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...") try: await self.solve_challenge(page) except PlaywrightError as err: logging.error(err) def parse(self, data): """ 解析职位数据并且告警 """ name_check_list = [ '智能合约', 'Solidity', '区块链合约' ] jobList = data['zpData']['jobList'] for item in jobList: salaryDesc = item['salaryDesc'] encryptJobId = item['encryptJobId'] url = f'https://www.zhipin.com/job_detail/{encryptJobId}.html' jobName = item['jobName'] # 判断名称是否符合标准  name_ok = False for keyword in name_check_list: if keyword in jobName: name_ok = True if not name_ok: logger.info(f'不匹配:{jobName}') continue if 'K' not in salaryDesc: logger.info(f'薪资太低:{salaryDesc} {jobName}') try: salaryDesc_up = salaryDesc.split('-')[-1].split('K')[0] salaryDesc_up_int = int(salaryDesc_up) if salaryDesc_up_int < 20: logger.info(f'薪资太低:{salaryDesc} {jobName}') continue except: logger.info(f'薪资解析异常:{salaryDesc} {jobName}') continue brandName = item['brandName'] brandScaleName = item['brandScaleName'] cityName = item['cityName'] content = f"{salaryDesc}\n{jobName}\n{brandName}\n{brandScaleName}\n{cityName}\n{url}" alert_status = self.redis_cli.sismember(self.black_list_key, encryptJobId) if not alert_status: sender(content, self.lark_hook) self.redis_cli.sadd(self.black_list_key, encryptJobId) logger.info(content) else: logger.info(f'已告警,过滤:{content}') async def on_response(self, response): """ 监控数据流 """ if not response.ok: return if 'joblist.json' in response.url: try: oridata = await response.body() format_data = json.loads(oridata) print(format_data) self.parse(format_data) except: pass async def run_local(self, proxy=None): async with async_playwright() as p: # 使用代理配置  proxy_config = proxy # 必须得是有头浏览器,否则过不了Cloudflare  launch_data = { "headless": False, } # 如果有代理配置,添加到launch参数中  if proxy_config: launch_data["proxy"] = proxy_config logger.info(f"使用代理: {proxy_config['server']}") user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" browser = await p.chromium.launch(**launch_data) # 创建新的上下文  context = await browser.new_context(user_agent=user_agent) context.set_default_timeout(self._timeout * 1000) page = await context.new_page() await page.add_init_script("Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});") # 监听请求流  page.on('response', self.on_response) for code in self.codes: url = self.api.format(code) logger.info(f'GET {url}') # 访问目标地址  await page.goto(url) await asyncio.sleep(3) await self.detect(page) await asyncio.sleep(60) await context.close() logger.info('关闭浏览器') await browser.close() async def run_aws(self): """ 在AWS服务器启动 """ proxy = self.proxy from pyvirtualdisplay import Display with Display(): await self.run_local(proxy) @retry(tries=3, delay=3) def task(self): if env == 'local': asyncio.run(self.run_local()) else: asyncio.run(self.run_aws()) @scheduled_task(start_time=None, duration=20*60) def run(self): """ 利用浏览器实现binance 上币公告的采集 """ self.task() 
Enter fullscreen mode Exit fullscreen mode

Top comments (0)