book-python/tests/test_recommendation.py

34 lines
1001 B
Python

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from recommendation_service.utils.data_generator import TestDataGenerator
from recommendation_service.kafka.book_event_consumer import BookEventConsumer
from kafka import KafkaProducer
import json
import time
def test_recommendation_system():
# 创建Kafka生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# 创建测试数据生成器
data_generator = TestDataGenerator()
# 生成测试事件
test_events = data_generator.generate_events(num_events=20)
# 发送测试事件到Kafka
for event in test_events:
producer.send('book-events', event)
print(f"发送事件: {event}")
time.sleep(1) # 每秒发送一个事件
producer.flush()
producer.close()
if __name__ == "__main__":
test_recommendation_system()