DEV Community

drake
drake

Posted on

claude code写的代码

Image description

  • 需求是补充原表的粉丝数数据
#!/usr/bin/env python # -*- coding: utf-8 -*-  import csv import json import re import time import requests import os import pandas as pd from tqdm import tqdm class TwitterFollowerExtractor: """ A class to extract Twitter follower counts from a CSV file. """ # API configuration  X_RAPIDAPI_KEY = "xxx" RAPIDAPI_HOST = "twitter-v1-1-v2-api.p.rapidapi.com" ENDPOINT = "https://twitter-v1-1-v2-api.p.rapidapi.com/graphql/UserByScreenName" def __init__(self, csv_file_path): """ Initialize the extractor with the path to the CSV file. :param csv_file_path: The path to the CSV file. """ self.csv_file_path = csv_file_path self.df = None def _extract_twitter_username(self, url): """Extract Twitter username from URL.""" if not url: return None # Handle both profile URLs and status URLs  status_pattern = r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/' profile_pattern = r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/?]+)' # First try to extract from status URL  match = re.search(status_pattern, url) if match: username = match.group(1) # Skip if username is "status" (malformed URL)  if username.lower() == 'status': return None return username # Then try to extract from profile URL  match = re.search(profile_pattern, url) if match: username = match.group(1) # Remove any query parameters if present  username = username.split('?')[0] # Skip if username is "status" (malformed URL)  if username.lower() == 'status': return None return username return None def _get_follower_count(self, username): """Get follower count for a Twitter username using RapidAPI with retry logic.""" if not username: return None headers = { "X-RapidAPI-Key": self.X_RAPIDAPI_KEY, "X-RapidAPI-Host": self.RAPIDAPI_HOST } # Prepare variables according to the correct API format  variables = { "screen_name": username, "withSafetyModeUserFields": True, "withHighlightedLabel": True } querystring = {"variables": json.dumps(variables)} # Implement retry logic  max_retries = 3 retry_delay = 2 # seconds  for attempt in range(max_retries): try: response = requests.get(self.ENDPOINT, headers=headers, params=querystring) if response.status_code == 200: data = response.json() # Extract follower count from the response using the correct path  if "data" in data and "user" in data["data"] and data["data"]["user"]: user_result = data["data"]["user"]["result"] if "legacy" in user_result: return user_result["legacy"]["followers_count"] else: print(f"No user data found for {username}") else: print(f"API request failed for {username} (Attempt {attempt+1}/{max_retries}): Status code {response.status_code}") # If we're not on the last attempt, wait before retrying  if attempt < max_retries - 1: print(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) except Exception as e: print(f"Error fetching data for {username} (Attempt {attempt+1}/{max_retries}): {e}") # If we're not on the last attempt, wait before retrying  if attempt < max_retries - 1: print(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) return None def _backup_file(self): """Create a backup of the original CSV file.""" backup_file = self.csv_file_path + '.backup' try: with open(self.csv_file_path, 'rb') as src, open(backup_file, 'wb') as dst: dst.write(src.read()) print(f"Created backup of original file at {backup_file}") except Exception as e: print(f"Warning: Could not create backup file: {e}") def _load_csv(self): """Load the CSV file into a pandas DataFrame with enhanced compatibility.""" try: # Try different encoding methods for better compatibility  encodings = ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'latin-1'] df_loaded = False for encoding in encodings: try: self.df = pd.read_csv(self.csv_file_path, encoding=encoding) df_loaded = True print(f"Successfully loaded CSV with {len(self.df)} rows using {encoding} encoding.") break except UnicodeDecodeError: continue except Exception as e: print(f"Error with {encoding} encoding: {e}") continue if not df_loaded: print("Failed to load CSV with any encoding method.") return False # Clean up the DataFrame columns and data  self._clean_dataframe() return True except Exception as e: print(f"Error reading CSV file: {e}") return False def _clean_dataframe(self): """Clean the DataFrame to handle malformed data.""" # Clean column names by removing newlines and extra whitespace  self.df.columns = [col.strip().replace('\n', '').replace('\r', '') for col in self.df.columns] # Clean the '粉丝数' column if it exists  if '粉丝数' in self.df.columns: # Remove newlines and extra whitespace from the follower count column  self.df['粉丝数'] = self.df['粉丝数'].astype(str).str.strip().str.replace('\n', '').str.replace('\r', '') # Replace empty strings with None  self.df['粉丝数'] = self.df['粉丝数'].replace('', None) # Clean other string columns  for col in self.df.columns: if self.df[col].dtype == 'object': self.df[col] = self.df[col].astype(str).str.strip().str.replace('\n', '').str.replace('\r', '') # Replace 'nan' strings with None  self.df[col] = self.df[col].replace('nan', None) def _save_csv(self): """Save the updated DataFrame back to the CSV file.""" try: self.df.to_csv(self.csv_file_path, index=False, encoding='utf-8-sig') print(f"Process completed. Follower counts have been updated in {self.csv_file_path}.") except Exception as e: print(f"Error saving updated CSV: {e}") print("Please check the backup file if needed.") def _generate_summary(self, processed_count): """Generate and print a summary of the results.""" if '粉丝数' in self.df.columns: total_updated = self.df['粉丝数'].notna().sum() print(f"\nSummary:") print(f"Total Twitter accounts processed: {processed_count}") print(f"Successfully updated follower counts: {total_updated}") print(f"Failed to update follower counts: {processed_count - total_updated}") # Print top 10 accounts by follower count  if total_updated > 0: print("\nTop 10 accounts by follower count:") top_accounts = self.df[self.df['粉丝数'].notna()].sort_values('粉丝数', ascending=False).head(10) for _, row in top_accounts.iterrows(): url_value = row['url'] if 'url' in row and pd.notna(row['url']) else "N/A" followers = row['粉丝数'] if pd.notna(row['粉丝数']) else 0 # Clean the followers value and convert to int safely  try: # Remove any whitespace and newlines  followers_str = str(followers).strip() if followers_str and followers_str != 'nan': followers_int = int(float(followers_str)) print(f"- {self._extract_twitter_username(url_value)}: {followers_int} followers") except (ValueError, TypeError) as e: print(f"- {self._extract_twitter_username(url_value)}: Unable to parse follower count ({followers})") def process_followers(self): """ Main method to run the follower extraction process. """ print("Starting Twitter follower count extraction...") if not os.path.exists(self.csv_file_path): print(f"Error: File {self.csv_file_path} not found.") return self._backup_file() if not self._load_csv(): return usernames_to_process = [] for idx, row in self.df.iterrows(): twitter_url = None try: if 'ext_info' in row and pd.notna(row['ext_info']): ext_info = json.loads(row['ext_info']) if 'twitterUrl' in ext_info and ext_info['twitterUrl']: twitter_url = ext_info['twitterUrl'] except Exception as e: print(f"Error parsing ext_info for row {idx}: {e}") if not twitter_url and 'url' in self.df.columns and pd.notna(row['url']): twitter_url = row['url'] if twitter_url: username = self._extract_twitter_username(twitter_url) if username: usernames_to_process.append((idx, username, twitter_url)) print(f"Found {len(usernames_to_process)} Twitter usernames to process.") for idx, username, url in tqdm(usernames_to_process, desc="Fetching follower counts"): # Check if we already have a valid follower count  if '粉丝数' in self.df.columns and pd.notna(self.df.at[idx, '粉丝数']): existing_value = str(self.df.at[idx, '粉丝数']).strip() if existing_value and existing_value not in ['#VALUE!', 'nan', '', '\n']: try: # Try to convert to int to verify it's a valid number  int(float(existing_value)) print(f"Skipping {username} - already has follower count: {existing_value}") continue except (ValueError, TypeError): # If conversion fails, we'll fetch new data  pass follower_count = self._get_follower_count(username) if follower_count is not None: if '粉丝数' not in self.df.columns: self.df['粉丝数'] = None self.df.at[idx, '粉丝数'] = follower_count print(f"Updated {username} with {follower_count} followers") else: print(f"Could not get follower count for {username}") self._save_csv() self._generate_summary(len(usernames_to_process)) if __name__ == "__main__": # File path  CSV_FILE_PATH = "用户活动报名信息.csv" extractor = TwitterFollowerExtractor(CSV_FILE_PATH) extractor.process_followers() 
Enter fullscreen mode Exit fullscreen mode

Top comments (1)

Collapse
 
dragon72463399 profile image
drake

总体来说没啥毛病,就是代码有点冗长