From cb6fa04ca7a07293c784524041b74d90beaf1c15 Mon Sep 17 00:00:00 2001 From: ovo Date: Fri, 20 Dec 2024 16:24:19 +0800 Subject: [PATCH] demo --- .gitignore | 52 ++++ main.py | 40 +++ .../analyzer/preference_analyzer.py | 229 ++++++++++++++++++ .../kafka/book_event_consumer.py | 155 ++++++++++++ recommendation_service/models/user_profile.py | 28 +++ .../utils/data_generator.py | 79 ++++++ tests/test_recommendation.py | 34 +++ 7 files changed, 617 insertions(+) create mode 100644 .gitignore create mode 100644 main.py create mode 100644 recommendation_service/analyzer/preference_analyzer.py create mode 100644 recommendation_service/kafka/book_event_consumer.py create mode 100644 recommendation_service/models/user_profile.py create mode 100644 recommendation_service/utils/data_generator.py create mode 100644 tests/test_recommendation.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0a933ca --- /dev/null +++ b/.gitignore @@ -0,0 +1,52 @@ +# Compiled class file +*.class + +# Eclipse +.project +.classpath +.settings/ + +# Intellij +*.ipr +*.iml +*.iws +.idea/ + +# Maven +target/ + +# Gradle +build +.gradle + +# Log file +*.log + +# out +**/out/ + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar +*.tar.gz +*.rar +*.pid +*.orig + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# Mac +.DS_Store + +*.tmp diff --git a/main.py b/main.py new file mode 100644 index 0000000..5b0129b --- /dev/null +++ b/main.py @@ -0,0 +1,40 @@ +from recommendation_service.kafka.book_event_consumer import BookEventConsumer +import signal +import sys +import logging + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def signal_handler(sig, frame): + logger.info('接收到终止信号,正在关闭消费者...') + consumer.close() + sys.exit(0) + +if __name__ == "__main__": + logger.info("启动图书推荐服务...") + + # 注册信号处理器 + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # 创建并启动消费者 + logger.info("初始化Kafka消费者...") + consumer = BookEventConsumer( + bootstrap_servers=['localhost:9092'] + ) + + try: + logger.info("开始监听图书事件...") + consumer.start_consuming() + except KeyboardInterrupt: + logger.info('接收到中断信号,正在关闭...') + consumer.close() + except Exception as e: + logger.error(f"服务运行出错: {str(e)}") + consumer.close() + sys.exit(1) \ No newline at end of file diff --git a/recommendation_service/analyzer/preference_analyzer.py b/recommendation_service/analyzer/preference_analyzer.py new file mode 100644 index 0000000..80c2354 --- /dev/null +++ b/recommendation_service/analyzer/preference_analyzer.py @@ -0,0 +1,229 @@ +import pandas as pd +import numpy as np +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.metrics.pairwise import cosine_similarity +from collections import defaultdict +from datetime import datetime + + +class PreferenceAnalyzer: + def __init__(self): + self.vectorizer = TfidfVectorizer() + # 定义类别之间的相关性矩阵 + self.category_similarity = { + '科幻': {'科技': 0.6, '文学': 0.3}, + '科技': {'科幻': 0.6, '教育': 0.4}, + '文学': {'历史': 0.4, '哲学': 0.5}, + '历史': {'哲学': 0.4, '文学': 0.4}, + '哲学': {'文学': 0.5, '历史': 0.4}, + '经济': {'教育': 0.3, '科技': 0.3}, + '教育': {'科技': 0.4, '经济': 0.3} + } + + def analyze_reading_pattern(self, user_history): + """分析用户阅读模式""" + if not user_history: + return {} + + df = pd.DataFrame(user_history) + + patterns = { + '偏好时段': self._analyze_reading_time(df), + '分类权重': self._calculate_category_weights(df), + '阅读完成度': self._analyze_completion_rate(df), + '阅读趋势': self._analyze_reading_trends(df), + '推荐类别': self._get_recommended_categories(df) + } + return patterns + + def _analyze_reading_time(self, df): + """分析用户的阅读时间模式""" + try: + # 将时间戳转换为datetime对象 + df['timestamp'] = pd.to_datetime(df['timestamp']) + + # 提取小时 + df['hour'] = df['timestamp'].dt.hour + + # 统计每个小时段的阅读次数 + hourly_counts = df['hour'].value_counts().to_dict() + + # 将小时分成时段 + time_periods = { + '早晨 (6-9)': [6, 7, 8, 9], + '上午 (9-12)': [9, 10, 11, 12], + '下午 (12-18)': [12, 13, 14, 15, 16, 17, 18], + '晚上 (18-23)': [18, 19, 20, 21, 22, 23], + '深夜 (23-6)': [23, 0, 1, 2, 3, 4, 5] + } + + period_counts = defaultdict(int) + for hour, count in hourly_counts.items(): + for period, hours in time_periods.items(): + if hour in hours: + period_counts[period] += count + + # 找出最常阅读的时段 + if period_counts: + preferred_period = max(period_counts.items(), key=lambda x: x[1])[0] + else: + preferred_period = "未知" + + return { + '最佳阅读时段': preferred_period, + '时段分布': dict(period_counts) + } + + except Exception as e: + return {'错误': f'分析阅读时间时出错: {str(e)}'} + + def _calculate_category_weights(self, df): + """计算各类别权重""" + try: + category_counts = df['category'].value_counts() + total_reads = len(df) + return {cat: count / total_reads for cat, count in category_counts.items()} + except Exception as e: + return {'错误': f'计算类别权重时出错: {str(e)}'} + + def _analyze_completion_rate(self, df): + """分析完成率""" + try: + if 'progress' in df.columns: + avg_progress = df['progress'].mean() + completion_stats = { + '平均进度': round(avg_progress, 2), + '已完成书籍': len(df[df['progress'] >= 95]), + '总书籍数': len(df), + '完成率': round(len(df[df['progress'] >= 95]) / len(df) * 100, 2) + } + return completion_stats + return {'错误': '没有进度数据'} + except Exception as e: + return {'错误': f'分析完成率时出错: {str(e)}'} + + def get_reading_speed(self, df): + """计算阅读速度""" + try: + if 'duration' in df.columns: + avg_duration = df['duration'].mean() + return { + '平均阅读时长': round(avg_duration, 2), + '总阅读时间': df['duration'].sum() + } + return {'错误': '没有时长数据'} + except Exception as e: + return {'错误': f'计算阅读速度时出错: {str(e)}'} + + def _analyze_reading_trends(self, df): + """分析阅读趋势""" + try: + df['timestamp'] = pd.to_datetime(df['timestamp']) + df = df.sort_values('timestamp') + + # 计算每天的阅读时长 + daily_reading = df.groupby(df['timestamp'].dt.date)['duration'].sum() + + # 计算趋势 + if len(daily_reading) > 1: + trend = np.polyfit(range(len(daily_reading)), daily_reading.values, 1)[0] + trend_direction = '上升' if trend > 0 else '下降' if trend < 0 else '稳定' + else: + trend_direction = '数据不足' + + return { + '阅读趋势': trend_direction, + '日均阅读时长': round(daily_reading.mean(), 2), + '最长阅读时长': round(daily_reading.max(), 2) + } + except Exception as e: + return {'错误': f'分析阅读趋势时出错: {str(e)}'} + + def _get_recommended_categories(self, df): + """基于当前阅读偏好推荐类别""" + try: + # 获取当前阅读类别的权重 + current_weights = df['category'].value_counts(normalize=True).to_dict() + + # 计算推荐权重 + recommended_weights = defaultdict(float) + for category, weight in current_weights.items(): + # 考虑当前类别的相关类别 + if category in self.category_similarity: + for related_cat, similarity in self.category_similarity[category].items(): + recommended_weights[related_cat] += weight * similarity + + # 排除已经经常阅读的类别 + for category in current_weights: + if category in recommended_weights: + recommended_weights[category] *= 0.5 + + # 获取top3推荐类别 + top_recommendations = sorted( + recommended_weights.items(), + key=lambda x: x[1], + reverse=True + )[:3] + + return { + '推荐类别': [cat for cat, _ in top_recommendations], + '推荐理由': self._generate_recommendation_reasons(top_recommendations, current_weights) + } + except Exception as e: + return {'错误': f'生成类别推荐时出错: {str(e)}'} + + def _generate_recommendation_reasons(self, recommendations, current_weights): + """生成推荐理由""" + reasons = {} + for category, score in recommendations: + if score > 0: + # 找出与该类别最相关的当前阅读类别 + related_categories = [] + for curr_cat in current_weights: + if curr_cat in self.category_similarity and \ + category in self.category_similarity[curr_cat]: + related_categories.append(curr_cat) + + if related_categories: + reasons[category] = f"基于您对{','.join(related_categories)}的兴趣推荐" + else: + reasons[category] = "扩展阅读领域" + + return reasons + + def calculate_user_similarity(self, user1_history, user2_history): + """计算用户相似度""" + try: + if not user1_history or not user2_history: + return { + '相似度': 0.0, + '共同兴趣': [] + } + + df1 = pd.DataFrame(user1_history) + df2 = pd.DataFrame(user2_history) + + # 计算类别偏好相似度 + cat_weights1 = df1['category'].value_counts(normalize=True) + cat_weights2 = df2['category'].value_counts(normalize=True) + + # 使用余弦相似度计算 + categories = set(cat_weights1.index) | set(cat_weights2.index) + vec1 = [cat_weights1.get(cat, 0) for cat in categories] + vec2 = [cat_weights2.get(cat, 0) for cat in categories] + + similarity = cosine_similarity([vec1], [vec2])[0][0] + + # 计算共同兴趣 + common_interests = list(set(cat_weights1.index) & set(cat_weights2.index)) + + return { + '相似度': round(similarity, 2), + '共同兴趣': common_interests + } + except Exception as e: + return { + '错误': f'计算用户相似度时出错: {str(e)}', + '相似度': 0.0, + '共同兴趣': [] + } \ No newline at end of file diff --git a/recommendation_service/kafka/book_event_consumer.py b/recommendation_service/kafka/book_event_consumer.py new file mode 100644 index 0000000..9e78a7c --- /dev/null +++ b/recommendation_service/kafka/book_event_consumer.py @@ -0,0 +1,155 @@ +from kafka import KafkaConsumer +import json +import logging +from ..models.user_profile import UserProfile +from ..analyzer.preference_analyzer import PreferenceAnalyzer + +# 配置日志 +logger = logging.getLogger(__name__) + +class BookEventConsumer: + def __init__(self, bootstrap_servers=['localhost:9092']): + """初始化Kafka消费者""" + self.consumer = KafkaConsumer( + 'book-events', # topic名称 + bootstrap_servers=bootstrap_servers, + group_id='python-recommendation-group', + auto_offset_reset='earliest', + enable_auto_commit=True, + value_deserializer=lambda x: json.loads(x.decode('utf-8')) + ) + self.user_profiles = {} + self.analyzer = PreferenceAnalyzer() + + def _handle_read_event(self, user_id, event): + """处理阅读事件""" + reading_record = { + 'book_id': event['bookId'], + 'duration': event['readDuration'], + 'progress': event['progress'], + 'category': event['bookInfo']['category'], + 'author': event['bookInfo']['author'], + 'timestamp': event['timestamp'] + } + self.user_profiles[user_id].update_reading_history(reading_record) + self.user_profiles[user_id].update_preferences(reading_record) + logger.info(f"用户 {user_id} 的阅读记录已更新") + + # 分析用户阅读模式 + self._analyze_user_patterns(user_id) + + def _analyze_user_patterns(self, user_id): + """分析用户模式并输出结果""" + try: + # 获取用户的阅读模式分析 + patterns = self.analyzer.analyze_reading_pattern( + self.user_profiles[user_id].reading_history + ) + logger.info(f"\n用户 {user_id} 的阅读分析报告:") + logger.info("=" * 50) + + # 输出时间偏好 + if '偏好时段' in patterns: + logger.info(f"时间偏好: {patterns['偏好时段']['最佳阅读时段']}") + logger.info(f"时段分布: {patterns['偏好时段']['时段分布']}") + + # 输出分类偏好 + if '分类权重' in patterns: + logger.info("\n分类偏好:") + for category, weight in patterns['分类权重'].items(): + logger.info(f"- {category}: {weight:.2%}") + + # 输出阅读完成度 + if '阅读完成度' in patterns: + logger.info("\n阅读完成情况:") + for key, value in patterns['阅读完成度'].items(): + logger.info(f"- {key}: {value}") + + # 输出阅读趋势 + if '阅读趋势' in patterns: + logger.info("\n阅读趋势分析:") + for key, value in patterns['阅读趋势'].items(): + logger.info(f"- {key}: {value}") + + # 输出推荐类别 + if '推荐类别' in patterns: + logger.info("\n推荐类别:") + logger.info(f"推荐类别: {patterns['推荐类别']['推荐类别']}") + logger.info("推荐理由:") + for category, reason in patterns['推荐类别']['推荐理由'].items(): + logger.info(f"- {category}: {reason}") + + # 分析用户相似度 + self._analyze_user_similarities(user_id) + + logger.info("=" * 50) + + except Exception as e: + logger.error(f"分析用户模式时出错: {str(e)}") + + def _analyze_user_similarities(self, current_user_id): + """分析用户间的相似度""" + try: + if len(self.user_profiles) > 1: + logger.info("\n用户相似度分析:") + for other_user_id, other_profile in self.user_profiles.items(): + if other_user_id != current_user_id: + similarity = self.analyzer.calculate_user_similarity( + self.user_profiles[current_user_id].reading_history, + other_profile.reading_history + ) + logger.info(f"\n与用户 {other_user_id} 的相似度:") + logger.info(f"- 相似度指数: {similarity['相似度']:.2%}") + logger.info(f"- 共同兴趣: {', '.join(similarity['共同兴趣']) if similarity['共同兴趣'] else '暂无'}") + except Exception as e: + logger.error(f"分析用户相似度时出错: {str(e)}") + + def _handle_rate_event(self, user_id, event): + """处理评分事件""" + self.user_profiles[user_id].update_book_rating( + event['bookId'], + event['rating'] + ) + logger.info(f"用户 {user_id} 的评分记录已更新") + + def _handle_bookmark_event(self, user_id, event): + """处理收藏事件""" + self.user_profiles[user_id].add_bookmark(event['bookId']) + logger.info(f"用户 {user_id} 的收藏记录已更新") + + def process_message(self, message): + """处理接收到的消息""" + try: + event = message.value + logger.info(f"收到事件: {event}") + + # 获取用户画像,如果不存在则创建 + user_id = event['userId'] + if user_id not in self.user_profiles: + self.user_profiles[user_id] = UserProfile() + + # 根据事件类型处理 + event_type = event['eventType'] + if event_type == 'READ': + self._handle_read_event(user_id, event) + elif event_type == 'RATE': + self._handle_rate_event(user_id, event) + elif event_type == 'BOOKMARK': + self._handle_bookmark_event(user_id, event) + + except Exception as e: + logger.error(f"处理消息时发生错误: {str(e)}") + + def start_consuming(self): + """开始消费消息""" + logger.info("开始消费Kafka消息...") + try: + for message in self.consumer: + self.process_message(message) + except Exception as e: + logger.error(f"消费消息时发生错误: {str(e)}") + + def close(self): + """关闭消费者""" + self.consumer.close() + logger.info("消费者已关闭") \ No newline at end of file diff --git a/recommendation_service/models/user_profile.py b/recommendation_service/models/user_profile.py new file mode 100644 index 0000000..0f8cd57 --- /dev/null +++ b/recommendation_service/models/user_profile.py @@ -0,0 +1,28 @@ +class UserProfile: + def __init__(self): + self.reading_history = [] + self.category_preferences = {} + self.reading_time_pattern = {} + self.favorite_authors = set() + self.book_ratings = {} + self.bookmarks = set() + + def update_book_rating(self, book_id, rating): + """更新书籍评分""" + self.book_ratings[book_id] = rating + + def add_bookmark(self, book_id): + """添加书签""" + self.bookmarks.add(book_id) + + def update_reading_history(self, reading_record): + """更新阅读历史""" + self.reading_history.append(reading_record) + + def update_preferences(self, reading_record): + """更新用户阅读偏好""" + category = reading_record['category'] + self.category_preferences[category] = \ + self.category_preferences.get(category, 0) + 1 + + self.favorite_authors.add(reading_record['author']) \ No newline at end of file diff --git a/recommendation_service/utils/data_generator.py b/recommendation_service/utils/data_generator.py new file mode 100644 index 0000000..e4a61a0 --- /dev/null +++ b/recommendation_service/utils/data_generator.py @@ -0,0 +1,79 @@ +import random +import json +from datetime import datetime, timedelta + +class TestDataGenerator: + def __init__(self): + self.categories = ['科幻', '文学', '历史', '科技', '哲学', '经济', '教育'] + self.authors = ['刘慈欣', '余华', '东野圭吾', '村上春树', '张三', '李四', '王五'] + self.event_types = ['READ', 'RATE', 'BOOKMARK'] + + def generate_book(self): + """生成一本书的基本信息""" + return { + 'bookId': str(random.randint(1000, 9999)), + 'title': f'测试书籍_{random.randint(1, 100)}', + 'author': random.choice(self.authors), + 'category': random.choice(self.categories), + 'pageCount': random.randint(100, 500) + } + + def generate_random_time(self): + """生成随机时间""" + # 随机生成过去30天内的某一天 + days_ago = random.randint(0, 30) + base_date = datetime.now() - timedelta(days=days_ago) + + # 随机生成一天中的时间 + hour = random.randint(0, 23) + minute = random.randint(0, 59) + second = random.randint(0, 59) + + # 组合日期和时间 + random_time = base_date.replace( + hour=hour, + minute=minute, + second=second + ) + + return random_time + + def generate_event(self, user_id=None): + """生成一个随机事件""" + if user_id is None: + user_id = str(random.randint(1, 3)) + + event_type = random.choice(self.event_types) + book = self.generate_book() + timestamp = self.generate_random_time() + + event = { + 'eventId': str(random.randint(10000, 99999)), + 'eventType': event_type, + 'userId': user_id, + 'timestamp': timestamp.isoformat(), + 'bookId': book['bookId'], + 'bookInfo': book + } + + # 根据事件类型添加特定字段 + if event_type == 'READ': + event['readDuration'] = random.randint(10, 180) # 阅读时长(分钟) + event['progress'] = random.randint(1, 100) # 阅读进度 + elif event_type == 'RATE': + event['rating'] = random.randint(1, 5) # 评分 1-5 + + return event + + def generate_events(self, num_events=10): + """生成多个事件""" + events = [] + # 为每个用户生成一些事件 + for user_id in range(1, 4): # 假设有3个用户 + user_events = [self.generate_event(str(user_id)) + for _ in range(num_events // 3)] + events.extend(user_events) + + # 随机打乱事件顺序 + random.shuffle(events) + return events \ No newline at end of file diff --git a/tests/test_recommendation.py b/tests/test_recommendation.py new file mode 100644 index 0000000..35fb788 --- /dev/null +++ b/tests/test_recommendation.py @@ -0,0 +1,34 @@ +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from recommendation_service.utils.data_generator import TestDataGenerator +from recommendation_service.kafka.book_event_consumer import BookEventConsumer +from kafka import KafkaProducer +import json +import time + +def test_recommendation_system(): + # 创建Kafka生产者 + producer = KafkaProducer( + bootstrap_servers=['localhost:9092'], + value_serializer=lambda x: json.dumps(x).encode('utf-8') + ) + + # 创建测试数据生成器 + data_generator = TestDataGenerator() + + # 生成测试事件 + test_events = data_generator.generate_events(num_events=20) + + # 发送测试事件到Kafka + for event in test_events: + producer.send('book-events', event) + print(f"发送事件: {event}") + time.sleep(1) # 每秒发送一个事件 + + producer.flush() + producer.close() + +if __name__ == "__main__": + test_recommendation_system() \ No newline at end of file