test_publish_slot.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import json
  2. import redis
  3. # Redis 连接
  4. r = redis.Redis(
  5. host="text.skin",
  6. port=6379,
  7. db=0,
  8. password="STEs2x6ML0U1HlpE9SojM6YU7QPhqzY8",
  9. decode_responses=True
  10. )
  11. # 使用的键名(原 Channel 名称)
  12. key = "vs:signal:slot.lon.fr.tourist"
  13. # 消息体
  14. message = {
  15. "group_id": "tls.gb.fr",
  16. "apt_type": {
  17. "weight": 10,
  18. "routing_key": "slot.lon.fr.tourist",
  19. "city": "London",
  20. "visa_type": "Tourist",
  21. "country": "France"
  22. },
  23. "query_result": {
  24. "routing_key": "slot.lon.fr.tourist",
  25. "country": "France",
  26. "city": "London",
  27. "visa_type": "Tourist",
  28. "availability_status": "Available",
  29. "earliest_date": "2026-06-05",
  30. "availability": [
  31. {
  32. "date": "2026-06-05",
  33. "times": [
  34. {
  35. "time": "14:00",
  36. "label": ""
  37. }
  38. ]
  39. }
  40. ],
  41. "snapshot_source": "worker",
  42. "snapshot_at": "2026-05-02T14:23:45.123456+00:00"
  43. },
  44. "timestamp": 1777745853
  45. }
  46. # 写入 Redis 键,有效期 30 秒
  47. r.setex(key, 30, json.dumps(message, ensure_ascii=False))
  48. print(f"写入成功 -> 键: {key}")
  49. print(f"有效期: 30 秒")