import pandas as pd import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from collections import defaultdict from datetime import datetime class PreferenceAnalyzer: def __init__(self): self.vectorizer = TfidfVectorizer() # 定义类别之间的相关性矩阵 self.category_similarity = { '科幻': {'科技': 0.6, '文学': 0.3}, '科技': {'科幻': 0.6, '教育': 0.4}, '文学': {'历史': 0.4, '哲学': 0.5}, '历史': {'哲学': 0.4, '文学': 0.4}, '哲学': {'文学': 0.5, '历史': 0.4}, '经济': {'教育': 0.3, '科技': 0.3}, '教育': {'科技': 0.4, '经济': 0.3} } def analyze_reading_pattern(self, user_history): """分析用户阅读模式""" if not user_history: return {} df = pd.DataFrame(user_history) patterns = { '偏好时段': self._analyze_reading_time(df), '分类权重': self._calculate_category_weights(df), '阅读完成度': self._analyze_completion_rate(df), '阅读趋势': self._analyze_reading_trends(df), '推荐类别': self._get_recommended_categories(df) } return patterns def _analyze_reading_time(self, df): """分析用户的阅读时间模式""" try: # 将时间戳转换为datetime对象 df['timestamp'] = pd.to_datetime(df['timestamp']) # 提取小时 df['hour'] = df['timestamp'].dt.hour # 统计每个小时段的阅读次数 hourly_counts = df['hour'].value_counts().to_dict() # 将小时分成时段 time_periods = { '早晨 (6-9)': [6, 7, 8, 9], '上午 (9-12)': [9, 10, 11, 12], '下午 (12-18)': [12, 13, 14, 15, 16, 17, 18], '晚上 (18-23)': [18, 19, 20, 21, 22, 23], '深夜 (23-6)': [23, 0, 1, 2, 3, 4, 5] } period_counts = defaultdict(int) for hour, count in hourly_counts.items(): for period, hours in time_periods.items(): if hour in hours: period_counts[period] += count # 找出最常阅读的时段 if period_counts: preferred_period = max(period_counts.items(), key=lambda x: x[1])[0] else: preferred_period = "未知" return { '最佳阅读时段': preferred_period, '时段分布': dict(period_counts) } except Exception as e: return {'错误': f'分析阅读时间时出错: {str(e)}'} def _calculate_category_weights(self, df): """计算各类别权重""" try: category_counts = df['category'].value_counts() total_reads = len(df) return {cat: count / total_reads for cat, count in category_counts.items()} except Exception as e: return {'错误': f'计算类别权重时出错: {str(e)}'} def _analyze_completion_rate(self, df): """分析完成率""" try: if 'progress' in df.columns: avg_progress = df['progress'].mean() completion_stats = { '平均进度': round(avg_progress, 2), '已完成书籍': len(df[df['progress'] >= 95]), '总书籍数': len(df), '完成率': round(len(df[df['progress'] >= 95]) / len(df) * 100, 2) } return completion_stats return {'错误': '没有进度数据'} except Exception as e: return {'错误': f'分析完成率时出错: {str(e)}'} def get_reading_speed(self, df): """计算阅读速度""" try: if 'duration' in df.columns: avg_duration = df['duration'].mean() return { '平均阅读时长': round(avg_duration, 2), '总阅读时间': df['duration'].sum() } return {'错误': '没有时长数据'} except Exception as e: return {'错误': f'计算阅读速度时出错: {str(e)}'} def _analyze_reading_trends(self, df): """分析阅读趋势""" try: df['timestamp'] = pd.to_datetime(df['timestamp']) df = df.sort_values('timestamp') # 计算每天的阅读时长 daily_reading = df.groupby(df['timestamp'].dt.date)['duration'].sum() # 计算趋势 if len(daily_reading) > 1: trend = np.polyfit(range(len(daily_reading)), daily_reading.values, 1)[0] trend_direction = '上升' if trend > 0 else '下降' if trend < 0 else '稳定' else: trend_direction = '数据不足' return { '阅读趋势': trend_direction, '日均阅读时长': round(daily_reading.mean(), 2), '最长阅读时长': round(daily_reading.max(), 2) } except Exception as e: return {'错误': f'分析阅读趋势时出错: {str(e)}'} def _get_recommended_categories(self, df): """基于当前阅读偏好推荐类别""" try: # 获取当前阅读类别的权重 current_weights = df['category'].value_counts(normalize=True).to_dict() # 计算推荐权重 recommended_weights = defaultdict(float) for category, weight in current_weights.items(): # 考虑当前类别的相关类别 if category in self.category_similarity: for related_cat, similarity in self.category_similarity[category].items(): recommended_weights[related_cat] += weight * similarity # 排除已经经常阅读的类别 for category in current_weights: if category in recommended_weights: recommended_weights[category] *= 0.5 # 获取top3推荐类别 top_recommendations = sorted( recommended_weights.items(), key=lambda x: x[1], reverse=True )[:3] return { '推荐类别': [cat for cat, _ in top_recommendations], '推荐理由': self._generate_recommendation_reasons(top_recommendations, current_weights) } except Exception as e: return {'错误': f'生成类别推荐时出错: {str(e)}'} def _generate_recommendation_reasons(self, recommendations, current_weights): """生成推荐理由""" reasons = {} for category, score in recommendations: if score > 0: # 找出与该类别最相关的当前阅读类别 related_categories = [] for curr_cat in current_weights: if curr_cat in self.category_similarity and \ category in self.category_similarity[curr_cat]: related_categories.append(curr_cat) if related_categories: reasons[category] = f"基于您对{','.join(related_categories)}的兴趣推荐" else: reasons[category] = "扩展阅读领域" return reasons def calculate_user_similarity(self, user1_history, user2_history): """计算用户相似度""" try: if not user1_history or not user2_history: return { '相似度': 0.0, '共同兴趣': [] } df1 = pd.DataFrame(user1_history) df2 = pd.DataFrame(user2_history) # 计算类别偏好相似度 cat_weights1 = df1['category'].value_counts(normalize=True) cat_weights2 = df2['category'].value_counts(normalize=True) # 使用余弦相似度计算 categories = set(cat_weights1.index) | set(cat_weights2.index) vec1 = [cat_weights1.get(cat, 0) for cat in categories] vec2 = [cat_weights2.get(cat, 0) for cat in categories] similarity = cosine_similarity([vec1], [vec2])[0][0] # 计算共同兴趣 common_interests = list(set(cat_weights1.index) & set(cat_weights2.index)) return { '相似度': round(similarity, 2), '共同兴趣': common_interests } except Exception as e: return { '错误': f'计算用户相似度时出错: {str(e)}', '相似度': 0.0, '共同兴趣': [] }