Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- import os
- import redis
- import json
- from os.path import join, dirname
- from dotenv import load_dotenv
- from random import choice
- from datetime import datetime
- load_dotenv()
- host = os.environ.get("REDIS_PROXIES_HOST")
- port = os.environ.get("REDIS_PROXIES_PORT")
- password = os.environ.get("REDIS_PROXIES_PASSWORD")
- db = os.environ.get("REDIS_PROXIES_DB")
- sidekiq_queues = redis.Redis(host=host, port=port, db=db, password=password)
- queue = 'my_queue'
- job = 'MyJob'
- arguments = 123456789
- value = {
- "class": job,
- "queue": queue,
- "args": [arguments],
- "retry": 'true',
- "jid": ''.join([choice('qwertyuiopasdfghjklzxcvbnm1234567890') for i in range(24)]),
- "created_at": datetime.now().timestamp(),
- "enqueued_at": datetime.now().timestamp()}
- sidekiq_queues.sadd(f"queues", queue)
- sidekiq_queues.lpush(f"queue:{queue}" , json.dumps(value))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement