book-python/recommendation_service/analyzer/preference_analyzer.py

229 lines
9.0 KiB
Python
Raw Permalink Normal View History

2024-12-20 16:24:19 +08:00
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict
from datetime import datetime
class PreferenceAnalyzer:
def __init__(self):
self.vectorizer = TfidfVectorizer()
# 定义类别之间的相关性矩阵
self.category_similarity = {
'科幻': {'科技': 0.6, '文学': 0.3},
'科技': {'科幻': 0.6, '教育': 0.4},
'文学': {'历史': 0.4, '哲学': 0.5},
'历史': {'哲学': 0.4, '文学': 0.4},
'哲学': {'文学': 0.5, '历史': 0.4},
'经济': {'教育': 0.3, '科技': 0.3},
'教育': {'科技': 0.4, '经济': 0.3}
}
def analyze_reading_pattern(self, user_history):
"""分析用户阅读模式"""
if not user_history:
return {}
df = pd.DataFrame(user_history)
patterns = {
'偏好时段': self._analyze_reading_time(df),
'分类权重': self._calculate_category_weights(df),
'阅读完成度': self._analyze_completion_rate(df),
'阅读趋势': self._analyze_reading_trends(df),
'推荐类别': self._get_recommended_categories(df)
}
return patterns
def _analyze_reading_time(self, df):
"""分析用户的阅读时间模式"""
try:
# 将时间戳转换为datetime对象
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 提取小时
df['hour'] = df['timestamp'].dt.hour
# 统计每个小时段的阅读次数
hourly_counts = df['hour'].value_counts().to_dict()
# 将小时分成时段
time_periods = {
'早晨 (6-9)': [6, 7, 8, 9],
'上午 (9-12)': [9, 10, 11, 12],
'下午 (12-18)': [12, 13, 14, 15, 16, 17, 18],
'晚上 (18-23)': [18, 19, 20, 21, 22, 23],
'深夜 (23-6)': [23, 0, 1, 2, 3, 4, 5]
}
period_counts = defaultdict(int)
for hour, count in hourly_counts.items():
for period, hours in time_periods.items():
if hour in hours:
period_counts[period] += count
# 找出最常阅读的时段
if period_counts:
preferred_period = max(period_counts.items(), key=lambda x: x[1])[0]
else:
preferred_period = "未知"
return {
'最佳阅读时段': preferred_period,
'时段分布': dict(period_counts)
}
except Exception as e:
return {'错误': f'分析阅读时间时出错: {str(e)}'}
def _calculate_category_weights(self, df):
"""计算各类别权重"""
try:
category_counts = df['category'].value_counts()
total_reads = len(df)
return {cat: count / total_reads for cat, count in category_counts.items()}
except Exception as e:
return {'错误': f'计算类别权重时出错: {str(e)}'}
def _analyze_completion_rate(self, df):
"""分析完成率"""
try:
if 'progress' in df.columns:
avg_progress = df['progress'].mean()
completion_stats = {
'平均进度': round(avg_progress, 2),
'已完成书籍': len(df[df['progress'] >= 95]),
'总书籍数': len(df),
'完成率': round(len(df[df['progress'] >= 95]) / len(df) * 100, 2)
}
return completion_stats
return {'错误': '没有进度数据'}
except Exception as e:
return {'错误': f'分析完成率时出错: {str(e)}'}
def get_reading_speed(self, df):
"""计算阅读速度"""
try:
if 'duration' in df.columns:
avg_duration = df['duration'].mean()
return {
'平均阅读时长': round(avg_duration, 2),
'总阅读时间': df['duration'].sum()
}
return {'错误': '没有时长数据'}
except Exception as e:
return {'错误': f'计算阅读速度时出错: {str(e)}'}
def _analyze_reading_trends(self, df):
"""分析阅读趋势"""
try:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp')
# 计算每天的阅读时长
daily_reading = df.groupby(df['timestamp'].dt.date)['duration'].sum()
# 计算趋势
if len(daily_reading) > 1:
trend = np.polyfit(range(len(daily_reading)), daily_reading.values, 1)[0]
trend_direction = '上升' if trend > 0 else '下降' if trend < 0 else '稳定'
else:
trend_direction = '数据不足'
return {
'阅读趋势': trend_direction,
'日均阅读时长': round(daily_reading.mean(), 2),
'最长阅读时长': round(daily_reading.max(), 2)
}
except Exception as e:
return {'错误': f'分析阅读趋势时出错: {str(e)}'}
def _get_recommended_categories(self, df):
"""基于当前阅读偏好推荐类别"""
try:
# 获取当前阅读类别的权重
current_weights = df['category'].value_counts(normalize=True).to_dict()
# 计算推荐权重
recommended_weights = defaultdict(float)
for category, weight in current_weights.items():
# 考虑当前类别的相关类别
if category in self.category_similarity:
for related_cat, similarity in self.category_similarity[category].items():
recommended_weights[related_cat] += weight * similarity
# 排除已经经常阅读的类别
for category in current_weights:
if category in recommended_weights:
recommended_weights[category] *= 0.5
# 获取top3推荐类别
top_recommendations = sorted(
recommended_weights.items(),
key=lambda x: x[1],
reverse=True
)[:3]
return {
'推荐类别': [cat for cat, _ in top_recommendations],
'推荐理由': self._generate_recommendation_reasons(top_recommendations, current_weights)
}
except Exception as e:
return {'错误': f'生成类别推荐时出错: {str(e)}'}
def _generate_recommendation_reasons(self, recommendations, current_weights):
"""生成推荐理由"""
reasons = {}
for category, score in recommendations:
if score > 0:
# 找出与该类别最相关的当前阅读类别
related_categories = []
for curr_cat in current_weights:
if curr_cat in self.category_similarity and \
category in self.category_similarity[curr_cat]:
related_categories.append(curr_cat)
if related_categories:
reasons[category] = f"基于您对{','.join(related_categories)}的兴趣推荐"
else:
reasons[category] = "扩展阅读领域"
return reasons
def calculate_user_similarity(self, user1_history, user2_history):
"""计算用户相似度"""
try:
if not user1_history or not user2_history:
return {
'相似度': 0.0,
'共同兴趣': []
}
df1 = pd.DataFrame(user1_history)
df2 = pd.DataFrame(user2_history)
# 计算类别偏好相似度
cat_weights1 = df1['category'].value_counts(normalize=True)
cat_weights2 = df2['category'].value_counts(normalize=True)
# 使用余弦相似度计算
categories = set(cat_weights1.index) | set(cat_weights2.index)
vec1 = [cat_weights1.get(cat, 0) for cat in categories]
vec2 = [cat_weights2.get(cat, 0) for cat in categories]
similarity = cosine_similarity([vec1], [vec2])[0][0]
# 计算共同兴趣
common_interests = list(set(cat_weights1.index) & set(cat_weights2.index))
return {
'相似度': round(similarity, 2),
'共同兴趣': common_interests
}
except Exception as e:
return {
'错误': f'计算用户相似度时出错: {str(e)}',
'相似度': 0.0,
'共同兴趣': []
}