import json import time import logging from kafka import KafkaConsumer from kafka.errors import KafkaError from datetime import datetime from typing import Optional from config import config logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('Kafka Consumer') class Consumer: """ Kafka消费者基类 当Topic被消费完之后会自动断开连接,结束进程 """ def __init__(self): """ 初始化消费者 """ self.consumer = None self.message_count = 0 self.start_time = None self.bootstrap_servers = config.KAFKA_SERVERS self.topic = 'bg_spider_flow_news' # 直接写死topic名称,或者可以通过参数传入 self.group_id = 'python-consumer-group-1' # 可以根据需要设置消费者组ID def _create_consumer(self): """ 创建Kafka消费者(连接服务只重试10次) """ num = 0 logger.info("开始创建Kafka消费者...") while self.consumer is None: num += 1 try: self.consumer = KafkaConsumer( self.topic, bootstrap_servers=self.bootstrap_servers, group_id=self.group_id, client_id=self.group_id, auto_offset_reset='earliest', # enable_auto_commit=True, value_deserializer=lambda x: x.decode('utf-8') if x else None, key_deserializer=lambda x: x.decode('utf-8') if x else None # Topic没有消息时该参数会导致连接对象退出 # consumer_timeout_ms=1000 ) logger.info("✅ 成功创建Kafka消费者") except Exception as e: if num < 10: logger.warning(f"⚠️ 第 {num} 次尝试创建Kafka消费者失败,重试中...") time.sleep(2) else: logger.error(f"❌ 创建Kafka消费者失败: {e}") raise def start_consuming(self, max_messages: Optional[int] = None, show_stats_interval: int = 100, batch_size: int = 10, batch_timeout_ms: int = 5000): """ 批处理消费者 max_messages: 限制最大消费消息数 show_stats_interval: 每多少条消息显示一次统计信息 batch_size: 批量处理的消息数量 batch_timeout_ms: 批量获取消息的超时时间(毫秒) """ # 创建消费者 self._create_consumer() if not self.consumer: logger.error("❌ 消费者未初始化,无法开始消费") return self.start_time = datetime.now() logger.info("=" * 60) logger.info("🚀 开始消费Kafka消息") logger.info(f"⏰ 开始时间: {self.start_time}") logger.info(f"📦 批量大小: {batch_size}, 超时时间: {batch_timeout_ms}ms") logger.info("=" * 60) logger.info(f"⏳ 等待消费 Topic '{self.topic}' 的消息...") try: while True: # 使用poll方法批量获取消息 message_batch = self.consumer.poll( timeout_ms=batch_timeout_ms, max_records=batch_size ) if message_batch: # 处理这批消息 batch_messages = [] for topic_partition, messages in message_batch.items(): for message in messages: self.message_count += 1 batch_messages.append(message) if batch_messages: logger.info(f"🔥 获取到 {len(batch_messages)} 条消息,开始批量处理") self._process_message_batch(batch_messages) # 显示统计信息 if self.message_count % show_stats_interval == 0: self._show_statistics() # 检查最大消息数限制 if max_messages and self.message_count >= max_messages: logger.info(f"✅ 已消费{max_messages}条消息,停止消费") break else: # 没有消息时的处理(可选) logger.info("⏳ 等待新消息...") time.sleep(2) except KafkaError as e: logger.error(f"❌ Kafka错误: {e}") except Exception as e: logger.error(f"❌ 消费过程中发生错误: {e}") # def format_message(self, message_value): # """ # 格式化消息内容为标准的字典格式 # message_value: Json 等待序列化的json或字符串 # Returns: # Dict: 格式化后的消息字典 # None: 如果消息内容为空或格式不正确 # """ # message = None # try: # if message_value: # # 尝试解析JSON # try: # message = json.loads(message_value) # except json.JSONDecodeError: # logger.info(f" 📝 文本内容: {str(message_value)[:100]}{'...' if len(str(message_value)) > 100 else ''}") # else: # logger.info(" ⚪ 空消息") # except Exception as e: # logger.error(f"❌ 业务逻辑处理失败: {e}") # return message def _process_message_batch(self, messages): """ 批量处理消息 messages: List[ConsumerRecord], 消息列表 """ try: logger.info(f"开始批量处理 {len(messages)} 条消息") # 批量格式化消息 formatted_messages = [] for i, message in enumerate(messages): logger.info(f"处理第 {self.message_count - len(messages) + i + 1} 条消息,分区: {message.partition}, 偏移量: {message.offset}") formatted_message = self.format_message(message.value) if formatted_message: formatted_messages.append(formatted_message) if formatted_messages: # 调用批量业务处理逻辑 self.batch_message_jobs(formatted_messages) logger.info(f"✅ 批量处理完成,成功处理 {len(formatted_messages)} 条有效消息") except Exception as e: logger.error(f"❌ 批量处理消息失败: {e}") def batch_message_jobs(self, messages): """ 批量消费消息的特定处理逻辑 messages: List[Dict], 消息内容字典列表 """ try: logger.info(f"📦 批量处理 {len(messages)} 条消息") # 这里可以添加具体的批量业务逻辑处理代码 # 例如:批量插入数据库、批量调用API等 for i, message in enumerate(messages, 1): logger.info(f"批量消息 {i}: {message}") # 示例:如果需要批量插入数据库 # self._batch_insert_to_database(messages) except Exception as e: logger.error(f"❌ 批量消费者处理失败: {e}") # def start_consuming(self, max_messages: Optional[int] = None, show_stats_interval: int = 100): # """ # 开始消费消息 # max_messages: 限制最大消费消息数 # show_stats_interval: 每多少条消息显示一次统计信息 # """ # # 创建消费者 # self._create_consumer() # if not self.consumer: # logger.error("❌ 消费者未初始化,无法开始消费") # return # self.start_time = datetime.now() # logger.info("=" * 60) # logger.info("🚀 开始消费Kafka消息") # logger.info(f"⏰ 开始时间: {self.start_time}") # logger.info("=" * 60) # logger.info(f"📦 self.consumer: {self.consumer}") # if not self.consumer: # logger.error("❌ 消费者未初始化,无法开始消费") # return # # 使用 poll() 方法检查是否有消息 # message_batch = self.consumer.poll(timeout_ms=1000, max_records=1) # if not message_batch: # logger.info(f"⚪ Topic '{self.topic}' 当前没有可消费的消息") # return # try: # for message in self.consumer: # self.message_count += 1 # self._process_message(message) # # 显示统计信息 # if self.message_count % show_stats_interval == 0: # self._show_statistics() # # 检查最大消息数限制 # if max_messages and self.message_count >= max_messages: # logger.info(f"✅ 已消费{max_messages}条消息,停止消费") # break # except KafkaError as e: # logger.error(f"❌ Kafka错误: {e}") # except Exception as e: # logger.error(f"❌ 消费过程中发生错误: {e}") def _show_statistics(self): """显示统计信息""" if self.start_time: elapsed = datetime.now() - self.start_time rate = self.message_count / elapsed.total_seconds() if elapsed.total_seconds() > 0 else 0 logger.info(f"📊 统计信息: 已消费消息: {self.message_count} 消费速率: {rate:.2f} 条/秒") # def _process_message(self, message): # """ # 处理单条消息 # message: Kafkay原始消息对象 # """ # try: # logger.info(f"处理第 {self.message_count} 条消息,分区: {message.partition}, 偏移量: {message.offset}") # # 格式化消息内容为标准的字典格式 # message_formated_to_dict = self.format_message(message.value) # if message_formated_to_dict: # # 针对每条消息的详细处理逻辑 # self.consumer_per_message(message_formated_to_dict) # except Exception as e: # logger.error(f"❌ 处理消息失败: {e}") def format_message(self, message_value): """ 格式化消息内容为标准的字典格式 message_value: Json 等待序列化的json或字符串 Returns: Dict: 格式化后的消息字典 None: 如果消息内容为空或格式不正确 """ message = None try: if message_value: # 尝试解析JSON try: message = json.loads(message_value) except json.JSONDecodeError: logger.info(f" 📝 文本内容: {str(message_value)[:100]}{'...' if len(str(message_value)) > 100 else ''}") else: logger.info(" ⚪ 空消息") except Exception as e: logger.error(f"❌ 业务逻辑处理失败: {e}") return message # def consumer_per_message(self, message): # """ # 消费每条消息的特定处理逻辑 # message: Dict, 消息内容字典 # """ # try: # # 这里可以添加具体的业务逻辑处理代码 # logger.info(f"消息内容: {message}") # except Exception as e: # logger.error(f"❌ 消费者每条消息处理失败: {e}") For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)