from kafka import KafkaConsumer import json import logging from ..models.user_profile import UserProfile from ..analyzer.preference_analyzer import PreferenceAnalyzer # 配置日志 logger = logging.getLogger(__name__) class BookEventConsumer: def __init__(self, bootstrap_servers=['localhost:9092']): """初始化Kafka消费者""" self.consumer = KafkaConsumer( 'book-events', # topic名称 bootstrap_servers=bootstrap_servers, group_id='python-recommendation-group', auto_offset_reset='earliest', enable_auto_commit=True, value_deserializer=lambda x: json.loads(x.decode('utf-8')) ) self.user_profiles = {} self.analyzer = PreferenceAnalyzer() def _handle_read_event(self, user_id, event): """处理阅读事件""" reading_record = { 'book_id': event['bookId'], 'duration': event['readDuration'], 'progress': event['progress'], 'category': event['bookInfo']['category'], 'author': event['bookInfo']['author'], 'timestamp': event['timestamp'] } self.user_profiles[user_id].update_reading_history(reading_record) self.user_profiles[user_id].update_preferences(reading_record) logger.info(f"用户 {user_id} 的阅读记录已更新") # 分析用户阅读模式 self._analyze_user_patterns(user_id) def _analyze_user_patterns(self, user_id): """分析用户模式并输出结果""" try: # 获取用户的阅读模式分析 patterns = self.analyzer.analyze_reading_pattern( self.user_profiles[user_id].reading_history ) logger.info(f"\n用户 {user_id} 的阅读分析报告:") logger.info("=" * 50) # 输出时间偏好 if '偏好时段' in patterns: logger.info(f"时间偏好: {patterns['偏好时段']['最佳阅读时段']}") logger.info(f"时段分布: {patterns['偏好时段']['时段分布']}") # 输出分类偏好 if '分类权重' in patterns: logger.info("\n分类偏好:") for category, weight in patterns['分类权重'].items(): logger.info(f"- {category}: {weight:.2%}") # 输出阅读完成度 if '阅读完成度' in patterns: logger.info("\n阅读完成情况:") for key, value in patterns['阅读完成度'].items(): logger.info(f"- {key}: {value}") # 输出阅读趋势 if '阅读趋势' in patterns: logger.info("\n阅读趋势分析:") for key, value in patterns['阅读趋势'].items(): logger.info(f"- {key}: {value}") # 输出推荐类别 if '推荐类别' in patterns: logger.info("\n推荐类别:") logger.info(f"推荐类别: {patterns['推荐类别']['推荐类别']}") logger.info("推荐理由:") for category, reason in patterns['推荐类别']['推荐理由'].items(): logger.info(f"- {category}: {reason}") # 分析用户相似度 self._analyze_user_similarities(user_id) logger.info("=" * 50) except Exception as e: logger.error(f"分析用户模式时出错: {str(e)}") def _analyze_user_similarities(self, current_user_id): """分析用户间的相似度""" try: if len(self.user_profiles) > 1: logger.info("\n用户相似度分析:") for other_user_id, other_profile in self.user_profiles.items(): if other_user_id != current_user_id: similarity = self.analyzer.calculate_user_similarity( self.user_profiles[current_user_id].reading_history, other_profile.reading_history ) logger.info(f"\n与用户 {other_user_id} 的相似度:") logger.info(f"- 相似度指数: {similarity['相似度']:.2%}") logger.info(f"- 共同兴趣: {', '.join(similarity['共同兴趣']) if similarity['共同兴趣'] else '暂无'}") except Exception as e: logger.error(f"分析用户相似度时出错: {str(e)}") def _handle_rate_event(self, user_id, event): """处理评分事件""" self.user_profiles[user_id].update_book_rating( event['bookId'], event['rating'] ) logger.info(f"用户 {user_id} 的评分记录已更新") def _handle_bookmark_event(self, user_id, event): """处理收藏事件""" self.user_profiles[user_id].add_bookmark(event['bookId']) logger.info(f"用户 {user_id} 的收藏记录已更新") def process_message(self, message): """处理接收到的消息""" try: event = message.value logger.info(f"收到事件: {event}") # 获取用户画像,如果不存在则创建 user_id = event['userId'] if user_id not in self.user_profiles: self.user_profiles[user_id] = UserProfile() # 根据事件类型处理 event_type = event['eventType'] if event_type == 'READ': self._handle_read_event(user_id, event) elif event_type == 'RATE': self._handle_rate_event(user_id, event) elif event_type == 'BOOKMARK': self._handle_bookmark_event(user_id, event) except Exception as e: logger.error(f"处理消息时发生错误: {str(e)}") def start_consuming(self): """开始消费消息""" logger.info("开始消费Kafka消息...") try: for message in self.consumer: self.process_message(message) except Exception as e: logger.error(f"消费消息时发生错误: {str(e)}") def close(self): """关闭消费者""" self.consumer.close() logger.info("消费者已关闭")