Consume with Python
This guide provides practical examples for consuming messages from Hooque queues using Python.
Polling (REST API)
The Pull model is ideal for workers that process messages at their own pace.
This is the most reliable way to consume messages. You manually acknowledge (Ack) only after successful processing.
import requests
import json
import time
# Obtain this from your consumer configuration in the Hooque dashboard
QUEUE_URL = "https://app.hooque.io/queues/cons_xxxxxxxx"
TOKEN = "your_consumer_token"
def worker_loop():
headers = {"Authorization": f"Bearer {TOKEN}"}
next_url = f"{QUEUE_URL}/next"
print(f"Starting worker for {QUEUE_URL}...")
while True:
try:
# 1. Pull the next message
response = requests.get(next_url, headers=headers)
if response.status_code != 200:
# Queue is empty or error, wait a bit
time.sleep(1)
continue
payload = response.json()
meta = json.loads(response.headers.get("X-Hooque-Meta", "{}"))
try:
# 2. Process your business logic here
print(f"Processing message {meta.get('messageId')}: {payload}")
# 3. Success: Acknowledge (Ack) to delete message from queue
requests.post(meta["ackUrl"], headers=headers)
except Exception as e:
print(f"Error processing message: {e}")
# 4. Failure: Negative Acknowledge (Nack)
# This will cause the message to be retried according to your queue policy.
requests.post(meta["nackUrl"], json={"reason": str(e)}, headers=headers)
# Note: You can use meta["rejectUrl"] if the failure is permanent
# and you don't want to retry (e.g., invalid data format).
except Exception as e:
print(f"Worker polling error: {e}")
time.sleep(5)
if __name__ == "__main__":
# Option A: Run directly (blocks main thread)
worker_loop()
# Option B: Run in a background thread
# import threading
# threading.Thread(target=worker_loop, daemon=True).start()
# while True: time.sleep(1)
Hooque deletes the message immediately upon delivery. Use this for non-critical tasks where "at-least-once" delivery isn't required.
import requests
import time
QUEUE_URL = "https://app.hooque.io/queues/cons_xxxxxxxx"
TOKEN = "your_consumer_token"
def worker_loop():
headers = {"Authorization": f"Bearer {TOKEN}"}
next_url = f"{QUEUE_URL}/next"
params = {"autoAck": "true"}
print(f"Starting worker (Auto Ack) for {QUEUE_URL}...")
while True:
try:
response = requests.get(next_url, headers=headers, params=params)
if response.status_code != 200:
time.sleep(1)
continue
payload = response.json()
# Process your business logic here.
# Message is already deleted from the queue.
print(f"Processing: {payload}")
except Exception as e:
print(f"Worker error: {e}")
time.sleep(5)
if __name__ == "__main__":
worker_loop()
Async Polling
import asyncio
import httpx
import json
async def async_worker():
headers = {"Authorization": f"Bearer {TOKEN}"}
next_url = f"{QUEUE_URL}/next"
async with httpx.AsyncClient() as client:
while True:
try:
resp = await client.get(next_url, headers=headers)
if resp.status_code != 200:
await asyncio.sleep(1)
continue
payload = resp.json()
meta = json.loads(resp.headers.get("X-Hooque-Meta", "{}"))
try:
# Process your business logic here
print(f"Processing: {payload}")
await client.post(meta["ackUrl"])
except Exception as e:
print(f"Processing error: {e}")
# Nack will retry the message
await client.post(meta["nackUrl"], json={"reason": str(e)})
except Exception as e:
print(f"Async error: {e}")
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(async_worker())
Streaming (SSE)
Real-time message delivery over a persistent connection.
import sseclient
import requests
import json
def start_stream():
url = f"{QUEUE_URL}/stream"
headers = {"Authorization": f"Bearer {TOKEN}"}
response = requests.get(url, headers=headers, stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
if event.event == 'message':
data = json.loads(event.data)
payload = data['payload']
meta = data['meta']
try:
# Process your business logic here
print(f"Received: {payload}")
requests.post(meta['ackUrl'], headers=headers)
except Exception as e:
print(f"Error: {e}")
# Nack will retry the message
requests.post(meta['nackUrl'], json={"reason": str(e)}, headers=headers)
if __name__ == "__main__":
start_stream()
import sseclient
import requests
def start_stream():
url = f"{QUEUE_URL}/stream?autoAck=true"
headers = {"Authorization": f"Bearer {TOKEN}"}
response = requests.get(url, headers=headers, stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
if event.event == 'message':
data = json.loads(event.data)
# Process your business logic here
print(f"Received (Auto Ack): {data['payload']}")
if __name__ == "__main__":
start_stream()