book-python/recommendation_service/kafka/book_event_consumer.py

155 lines
6.3 KiB
Python
Raw Normal View History

2024-12-20 16:24:19 +08:00
from kafka import KafkaConsumer
import json
import logging
from ..models.user_profile import UserProfile
from ..analyzer.preference_analyzer import PreferenceAnalyzer
# 配置日志
logger = logging.getLogger(__name__)
class BookEventConsumer:
def __init__(self, bootstrap_servers=['localhost:9092']):
"""初始化Kafka消费者"""
self.consumer = KafkaConsumer(
'book-events', # topic名称
bootstrap_servers=bootstrap_servers,
group_id='python-recommendation-group',
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.user_profiles = {}
self.analyzer = PreferenceAnalyzer()
def _handle_read_event(self, user_id, event):
"""处理阅读事件"""
reading_record = {
'book_id': event['bookId'],
'duration': event['readDuration'],
'progress': event['progress'],
'category': event['bookInfo']['category'],
'author': event['bookInfo']['author'],
'timestamp': event['timestamp']
}
self.user_profiles[user_id].update_reading_history(reading_record)
self.user_profiles[user_id].update_preferences(reading_record)
logger.info(f"用户 {user_id} 的阅读记录已更新")
# 分析用户阅读模式
self._analyze_user_patterns(user_id)
def _analyze_user_patterns(self, user_id):
"""分析用户模式并输出结果"""
try:
# 获取用户的阅读模式分析
patterns = self.analyzer.analyze_reading_pattern(
self.user_profiles[user_id].reading_history
)
logger.info(f"\n用户 {user_id} 的阅读分析报告:")
logger.info("=" * 50)
# 输出时间偏好
if '偏好时段' in patterns:
logger.info(f"时间偏好: {patterns['偏好时段']['最佳阅读时段']}")
logger.info(f"时段分布: {patterns['偏好时段']['时段分布']}")
# 输出分类偏好
if '分类权重' in patterns:
logger.info("\n分类偏好:")
for category, weight in patterns['分类权重'].items():
logger.info(f"- {category}: {weight:.2%}")
# 输出阅读完成度
if '阅读完成度' in patterns:
logger.info("\n阅读完成情况:")
for key, value in patterns['阅读完成度'].items():
logger.info(f"- {key}: {value}")
# 输出阅读趋势
if '阅读趋势' in patterns:
logger.info("\n阅读趋势分析:")
for key, value in patterns['阅读趋势'].items():
logger.info(f"- {key}: {value}")
# 输出推荐类别
if '推荐类别' in patterns:
logger.info("\n推荐类别:")
logger.info(f"推荐类别: {patterns['推荐类别']['推荐类别']}")
logger.info("推荐理由:")
for category, reason in patterns['推荐类别']['推荐理由'].items():
logger.info(f"- {category}: {reason}")
# 分析用户相似度
self._analyze_user_similarities(user_id)
logger.info("=" * 50)
except Exception as e:
logger.error(f"分析用户模式时出错: {str(e)}")
def _analyze_user_similarities(self, current_user_id):
"""分析用户间的相似度"""
try:
if len(self.user_profiles) > 1:
logger.info("\n用户相似度分析:")
for other_user_id, other_profile in self.user_profiles.items():
if other_user_id != current_user_id:
similarity = self.analyzer.calculate_user_similarity(
self.user_profiles[current_user_id].reading_history,
other_profile.reading_history
)
logger.info(f"\n与用户 {other_user_id} 的相似度:")
logger.info(f"- 相似度指数: {similarity['相似度']:.2%}")
logger.info(f"- 共同兴趣: {', '.join(similarity['共同兴趣']) if similarity['共同兴趣'] else '暂无'}")
except Exception as e:
logger.error(f"分析用户相似度时出错: {str(e)}")
def _handle_rate_event(self, user_id, event):
"""处理评分事件"""
self.user_profiles[user_id].update_book_rating(
event['bookId'],
event['rating']
)
logger.info(f"用户 {user_id} 的评分记录已更新")
def _handle_bookmark_event(self, user_id, event):
"""处理收藏事件"""
self.user_profiles[user_id].add_bookmark(event['bookId'])
logger.info(f"用户 {user_id} 的收藏记录已更新")
def process_message(self, message):
"""处理接收到的消息"""
try:
event = message.value
logger.info(f"收到事件: {event}")
# 获取用户画像,如果不存在则创建
user_id = event['userId']
if user_id not in self.user_profiles:
self.user_profiles[user_id] = UserProfile()
# 根据事件类型处理
event_type = event['eventType']
if event_type == 'READ':
self._handle_read_event(user_id, event)
elif event_type == 'RATE':
self._handle_rate_event(user_id, event)
elif event_type == 'BOOKMARK':
self._handle_bookmark_event(user_id, event)
except Exception as e:
logger.error(f"处理消息时发生错误: {str(e)}")
def start_consuming(self):
"""开始消费消息"""
logger.info("开始消费Kafka消息...")
try:
for message in self.consumer:
self.process_message(message)
except Exception as e:
logger.error(f"消费消息时发生错误: {str(e)}")
def close(self):
"""关闭消费者"""
self.consumer.close()
logger.info("消费者已关闭")