import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from recommendation_service.utils.data_generator import TestDataGenerator from recommendation_service.kafka.book_event_consumer import BookEventConsumer from kafka import KafkaProducer import json import time def test_recommendation_system(): # 创建Kafka生产者 producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8') ) # 创建测试数据生成器 data_generator = TestDataGenerator() # 生成测试事件 test_events = data_generator.generate_events(num_events=20) # 发送测试事件到Kafka for event in test_events: producer.send('book-events', event) print(f"发送事件: {event}") time.sleep(1) # 每秒发送一个事件 producer.flush() producer.close() if __name__ == "__main__": test_recommendation_system()