This commit is contained in:
ovo 2024-12-20 16:24:19 +08:00
commit cb6fa04ca7
7 changed files with 617 additions and 0 deletions

52
.gitignore vendored Normal file
View File

@ -0,0 +1,52 @@
# Compiled class file
*.class
# Eclipse
.project
.classpath
.settings/
# Intellij
*.ipr
*.iml
*.iws
.idea/
# Maven
target/
# Gradle
build
.gradle
# Log file
*.log
# out
**/out/
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar
*.tar.gz
*.rar
*.pid
*.orig
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
# Mac
.DS_Store
*.tmp

40
main.py Normal file
View File

@ -0,0 +1,40 @@
from recommendation_service.kafka.book_event_consumer import BookEventConsumer
import signal
import sys
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def signal_handler(sig, frame):
logger.info('接收到终止信号,正在关闭消费者...')
consumer.close()
sys.exit(0)
if __name__ == "__main__":
logger.info("启动图书推荐服务...")
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 创建并启动消费者
logger.info("初始化Kafka消费者...")
consumer = BookEventConsumer(
bootstrap_servers=['localhost:9092']
)
try:
logger.info("开始监听图书事件...")
consumer.start_consuming()
except KeyboardInterrupt:
logger.info('接收到中断信号,正在关闭...')
consumer.close()
except Exception as e:
logger.error(f"服务运行出错: {str(e)}")
consumer.close()
sys.exit(1)

View File

@ -0,0 +1,229 @@
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict
from datetime import datetime
class PreferenceAnalyzer:
def __init__(self):
self.vectorizer = TfidfVectorizer()
# 定义类别之间的相关性矩阵
self.category_similarity = {
'科幻': {'科技': 0.6, '文学': 0.3},
'科技': {'科幻': 0.6, '教育': 0.4},
'文学': {'历史': 0.4, '哲学': 0.5},
'历史': {'哲学': 0.4, '文学': 0.4},
'哲学': {'文学': 0.5, '历史': 0.4},
'经济': {'教育': 0.3, '科技': 0.3},
'教育': {'科技': 0.4, '经济': 0.3}
}
def analyze_reading_pattern(self, user_history):
"""分析用户阅读模式"""
if not user_history:
return {}
df = pd.DataFrame(user_history)
patterns = {
'偏好时段': self._analyze_reading_time(df),
'分类权重': self._calculate_category_weights(df),
'阅读完成度': self._analyze_completion_rate(df),
'阅读趋势': self._analyze_reading_trends(df),
'推荐类别': self._get_recommended_categories(df)
}
return patterns
def _analyze_reading_time(self, df):
"""分析用户的阅读时间模式"""
try:
# 将时间戳转换为datetime对象
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 提取小时
df['hour'] = df['timestamp'].dt.hour
# 统计每个小时段的阅读次数
hourly_counts = df['hour'].value_counts().to_dict()
# 将小时分成时段
time_periods = {
'早晨 (6-9)': [6, 7, 8, 9],
'上午 (9-12)': [9, 10, 11, 12],
'下午 (12-18)': [12, 13, 14, 15, 16, 17, 18],
'晚上 (18-23)': [18, 19, 20, 21, 22, 23],
'深夜 (23-6)': [23, 0, 1, 2, 3, 4, 5]
}
period_counts = defaultdict(int)
for hour, count in hourly_counts.items():
for period, hours in time_periods.items():
if hour in hours:
period_counts[period] += count
# 找出最常阅读的时段
if period_counts:
preferred_period = max(period_counts.items(), key=lambda x: x[1])[0]
else:
preferred_period = "未知"
return {
'最佳阅读时段': preferred_period,
'时段分布': dict(period_counts)
}
except Exception as e:
return {'错误': f'分析阅读时间时出错: {str(e)}'}
def _calculate_category_weights(self, df):
"""计算各类别权重"""
try:
category_counts = df['category'].value_counts()
total_reads = len(df)
return {cat: count / total_reads for cat, count in category_counts.items()}
except Exception as e:
return {'错误': f'计算类别权重时出错: {str(e)}'}
def _analyze_completion_rate(self, df):
"""分析完成率"""
try:
if 'progress' in df.columns:
avg_progress = df['progress'].mean()
completion_stats = {
'平均进度': round(avg_progress, 2),
'已完成书籍': len(df[df['progress'] >= 95]),
'总书籍数': len(df),
'完成率': round(len(df[df['progress'] >= 95]) / len(df) * 100, 2)
}
return completion_stats
return {'错误': '没有进度数据'}
except Exception as e:
return {'错误': f'分析完成率时出错: {str(e)}'}
def get_reading_speed(self, df):
"""计算阅读速度"""
try:
if 'duration' in df.columns:
avg_duration = df['duration'].mean()
return {
'平均阅读时长': round(avg_duration, 2),
'总阅读时间': df['duration'].sum()
}
return {'错误': '没有时长数据'}
except Exception as e:
return {'错误': f'计算阅读速度时出错: {str(e)}'}
def _analyze_reading_trends(self, df):
"""分析阅读趋势"""
try:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp')
# 计算每天的阅读时长
daily_reading = df.groupby(df['timestamp'].dt.date)['duration'].sum()
# 计算趋势
if len(daily_reading) > 1:
trend = np.polyfit(range(len(daily_reading)), daily_reading.values, 1)[0]
trend_direction = '上升' if trend > 0 else '下降' if trend < 0 else '稳定'
else:
trend_direction = '数据不足'
return {
'阅读趋势': trend_direction,
'日均阅读时长': round(daily_reading.mean(), 2),
'最长阅读时长': round(daily_reading.max(), 2)
}
except Exception as e:
return {'错误': f'分析阅读趋势时出错: {str(e)}'}
def _get_recommended_categories(self, df):
"""基于当前阅读偏好推荐类别"""
try:
# 获取当前阅读类别的权重
current_weights = df['category'].value_counts(normalize=True).to_dict()
# 计算推荐权重
recommended_weights = defaultdict(float)
for category, weight in current_weights.items():
# 考虑当前类别的相关类别
if category in self.category_similarity:
for related_cat, similarity in self.category_similarity[category].items():
recommended_weights[related_cat] += weight * similarity
# 排除已经经常阅读的类别
for category in current_weights:
if category in recommended_weights:
recommended_weights[category] *= 0.5
# 获取top3推荐类别
top_recommendations = sorted(
recommended_weights.items(),
key=lambda x: x[1],
reverse=True
)[:3]
return {
'推荐类别': [cat for cat, _ in top_recommendations],
'推荐理由': self._generate_recommendation_reasons(top_recommendations, current_weights)
}
except Exception as e:
return {'错误': f'生成类别推荐时出错: {str(e)}'}
def _generate_recommendation_reasons(self, recommendations, current_weights):
"""生成推荐理由"""
reasons = {}
for category, score in recommendations:
if score > 0:
# 找出与该类别最相关的当前阅读类别
related_categories = []
for curr_cat in current_weights:
if curr_cat in self.category_similarity and \
category in self.category_similarity[curr_cat]:
related_categories.append(curr_cat)
if related_categories:
reasons[category] = f"基于您对{','.join(related_categories)}的兴趣推荐"
else:
reasons[category] = "扩展阅读领域"
return reasons
def calculate_user_similarity(self, user1_history, user2_history):
"""计算用户相似度"""
try:
if not user1_history or not user2_history:
return {
'相似度': 0.0,
'共同兴趣': []
}
df1 = pd.DataFrame(user1_history)
df2 = pd.DataFrame(user2_history)
# 计算类别偏好相似度
cat_weights1 = df1['category'].value_counts(normalize=True)
cat_weights2 = df2['category'].value_counts(normalize=True)
# 使用余弦相似度计算
categories = set(cat_weights1.index) | set(cat_weights2.index)
vec1 = [cat_weights1.get(cat, 0) for cat in categories]
vec2 = [cat_weights2.get(cat, 0) for cat in categories]
similarity = cosine_similarity([vec1], [vec2])[0][0]
# 计算共同兴趣
common_interests = list(set(cat_weights1.index) & set(cat_weights2.index))
return {
'相似度': round(similarity, 2),
'共同兴趣': common_interests
}
except Exception as e:
return {
'错误': f'计算用户相似度时出错: {str(e)}',
'相似度': 0.0,
'共同兴趣': []
}

View File

@ -0,0 +1,155 @@
from kafka import KafkaConsumer
import json
import logging
from ..models.user_profile import UserProfile
from ..analyzer.preference_analyzer import PreferenceAnalyzer
# 配置日志
logger = logging.getLogger(__name__)
class BookEventConsumer:
def __init__(self, bootstrap_servers=['localhost:9092']):
"""初始化Kafka消费者"""
self.consumer = KafkaConsumer(
'book-events', # topic名称
bootstrap_servers=bootstrap_servers,
group_id='python-recommendation-group',
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.user_profiles = {}
self.analyzer = PreferenceAnalyzer()
def _handle_read_event(self, user_id, event):
"""处理阅读事件"""
reading_record = {
'book_id': event['bookId'],
'duration': event['readDuration'],
'progress': event['progress'],
'category': event['bookInfo']['category'],
'author': event['bookInfo']['author'],
'timestamp': event['timestamp']
}
self.user_profiles[user_id].update_reading_history(reading_record)
self.user_profiles[user_id].update_preferences(reading_record)
logger.info(f"用户 {user_id} 的阅读记录已更新")
# 分析用户阅读模式
self._analyze_user_patterns(user_id)
def _analyze_user_patterns(self, user_id):
"""分析用户模式并输出结果"""
try:
# 获取用户的阅读模式分析
patterns = self.analyzer.analyze_reading_pattern(
self.user_profiles[user_id].reading_history
)
logger.info(f"\n用户 {user_id} 的阅读分析报告:")
logger.info("=" * 50)
# 输出时间偏好
if '偏好时段' in patterns:
logger.info(f"时间偏好: {patterns['偏好时段']['最佳阅读时段']}")
logger.info(f"时段分布: {patterns['偏好时段']['时段分布']}")
# 输出分类偏好
if '分类权重' in patterns:
logger.info("\n分类偏好:")
for category, weight in patterns['分类权重'].items():
logger.info(f"- {category}: {weight:.2%}")
# 输出阅读完成度
if '阅读完成度' in patterns:
logger.info("\n阅读完成情况:")
for key, value in patterns['阅读完成度'].items():
logger.info(f"- {key}: {value}")
# 输出阅读趋势
if '阅读趋势' in patterns:
logger.info("\n阅读趋势分析:")
for key, value in patterns['阅读趋势'].items():
logger.info(f"- {key}: {value}")
# 输出推荐类别
if '推荐类别' in patterns:
logger.info("\n推荐类别:")
logger.info(f"推荐类别: {patterns['推荐类别']['推荐类别']}")
logger.info("推荐理由:")
for category, reason in patterns['推荐类别']['推荐理由'].items():
logger.info(f"- {category}: {reason}")
# 分析用户相似度
self._analyze_user_similarities(user_id)
logger.info("=" * 50)
except Exception as e:
logger.error(f"分析用户模式时出错: {str(e)}")
def _analyze_user_similarities(self, current_user_id):
"""分析用户间的相似度"""
try:
if len(self.user_profiles) > 1:
logger.info("\n用户相似度分析:")
for other_user_id, other_profile in self.user_profiles.items():
if other_user_id != current_user_id:
similarity = self.analyzer.calculate_user_similarity(
self.user_profiles[current_user_id].reading_history,
other_profile.reading_history
)
logger.info(f"\n与用户 {other_user_id} 的相似度:")
logger.info(f"- 相似度指数: {similarity['相似度']:.2%}")
logger.info(f"- 共同兴趣: {', '.join(similarity['共同兴趣']) if similarity['共同兴趣'] else '暂无'}")
except Exception as e:
logger.error(f"分析用户相似度时出错: {str(e)}")
def _handle_rate_event(self, user_id, event):
"""处理评分事件"""
self.user_profiles[user_id].update_book_rating(
event['bookId'],
event['rating']
)
logger.info(f"用户 {user_id} 的评分记录已更新")
def _handle_bookmark_event(self, user_id, event):
"""处理收藏事件"""
self.user_profiles[user_id].add_bookmark(event['bookId'])
logger.info(f"用户 {user_id} 的收藏记录已更新")
def process_message(self, message):
"""处理接收到的消息"""
try:
event = message.value
logger.info(f"收到事件: {event}")
# 获取用户画像,如果不存在则创建
user_id = event['userId']
if user_id not in self.user_profiles:
self.user_profiles[user_id] = UserProfile()
# 根据事件类型处理
event_type = event['eventType']
if event_type == 'READ':
self._handle_read_event(user_id, event)
elif event_type == 'RATE':
self._handle_rate_event(user_id, event)
elif event_type == 'BOOKMARK':
self._handle_bookmark_event(user_id, event)
except Exception as e:
logger.error(f"处理消息时发生错误: {str(e)}")
def start_consuming(self):
"""开始消费消息"""
logger.info("开始消费Kafka消息...")
try:
for message in self.consumer:
self.process_message(message)
except Exception as e:
logger.error(f"消费消息时发生错误: {str(e)}")
def close(self):
"""关闭消费者"""
self.consumer.close()
logger.info("消费者已关闭")

View File

@ -0,0 +1,28 @@
class UserProfile:
def __init__(self):
self.reading_history = []
self.category_preferences = {}
self.reading_time_pattern = {}
self.favorite_authors = set()
self.book_ratings = {}
self.bookmarks = set()
def update_book_rating(self, book_id, rating):
"""更新书籍评分"""
self.book_ratings[book_id] = rating
def add_bookmark(self, book_id):
"""添加书签"""
self.bookmarks.add(book_id)
def update_reading_history(self, reading_record):
"""更新阅读历史"""
self.reading_history.append(reading_record)
def update_preferences(self, reading_record):
"""更新用户阅读偏好"""
category = reading_record['category']
self.category_preferences[category] = \
self.category_preferences.get(category, 0) + 1
self.favorite_authors.add(reading_record['author'])

View File

@ -0,0 +1,79 @@
import random
import json
from datetime import datetime, timedelta
class TestDataGenerator:
def __init__(self):
self.categories = ['科幻', '文学', '历史', '科技', '哲学', '经济', '教育']
self.authors = ['刘慈欣', '余华', '东野圭吾', '村上春树', '张三', '李四', '王五']
self.event_types = ['READ', 'RATE', 'BOOKMARK']
def generate_book(self):
"""生成一本书的基本信息"""
return {
'bookId': str(random.randint(1000, 9999)),
'title': f'测试书籍_{random.randint(1, 100)}',
'author': random.choice(self.authors),
'category': random.choice(self.categories),
'pageCount': random.randint(100, 500)
}
def generate_random_time(self):
"""生成随机时间"""
# 随机生成过去30天内的某一天
days_ago = random.randint(0, 30)
base_date = datetime.now() - timedelta(days=days_ago)
# 随机生成一天中的时间
hour = random.randint(0, 23)
minute = random.randint(0, 59)
second = random.randint(0, 59)
# 组合日期和时间
random_time = base_date.replace(
hour=hour,
minute=minute,
second=second
)
return random_time
def generate_event(self, user_id=None):
"""生成一个随机事件"""
if user_id is None:
user_id = str(random.randint(1, 3))
event_type = random.choice(self.event_types)
book = self.generate_book()
timestamp = self.generate_random_time()
event = {
'eventId': str(random.randint(10000, 99999)),
'eventType': event_type,
'userId': user_id,
'timestamp': timestamp.isoformat(),
'bookId': book['bookId'],
'bookInfo': book
}
# 根据事件类型添加特定字段
if event_type == 'READ':
event['readDuration'] = random.randint(10, 180) # 阅读时长(分钟)
event['progress'] = random.randint(1, 100) # 阅读进度
elif event_type == 'RATE':
event['rating'] = random.randint(1, 5) # 评分 1-5
return event
def generate_events(self, num_events=10):
"""生成多个事件"""
events = []
# 为每个用户生成一些事件
for user_id in range(1, 4): # 假设有3个用户
user_events = [self.generate_event(str(user_id))
for _ in range(num_events // 3)]
events.extend(user_events)
# 随机打乱事件顺序
random.shuffle(events)
return events

View File

@ -0,0 +1,34 @@
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from recommendation_service.utils.data_generator import TestDataGenerator
from recommendation_service.kafka.book_event_consumer import BookEventConsumer
from kafka import KafkaProducer
import json
import time
def test_recommendation_system():
# 创建Kafka生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# 创建测试数据生成器
data_generator = TestDataGenerator()
# 生成测试事件
test_events = data_generator.generate_events(num_events=20)
# 发送测试事件到Kafka
for event in test_events:
producer.send('book-events', event)
print(f"发送事件: {event}")
time.sleep(1) # 每秒发送一个事件
producer.flush()
producer.close()
if __name__ == "__main__":
test_recommendation_system()