Skip to content

Consume with Python

This guide provides practical examples for consuming messages from Hooque queues using Python.

Polling (REST API)

The Pull model is ideal for workers that process messages at their own pace.

This is the most reliable way to consume messages. You manually acknowledge (Ack) only after successful processing.

import requests
import json
import time

# Obtain this from your consumer configuration in the Hooque dashboard
QUEUE_URL = "https://app.hooque.io/queues/cons_xxxxxxxx"
TOKEN = "your_consumer_token"

def worker_loop():
    headers = {"Authorization": f"Bearer {TOKEN}"}
    next_url = f"{QUEUE_URL}/next"

    print(f"Starting worker for {QUEUE_URL}...")

    while True:
        try:
            # 1. Pull the next message
            response = requests.get(next_url, headers=headers)

            if response.status_code != 200:
                # Queue is empty or error, wait a bit
                time.sleep(1)
                continue

            payload = response.json()
            meta = json.loads(response.headers.get("X-Hooque-Meta", "{}"))

            try:
                # 2. Process your business logic here
                print(f"Processing message {meta.get('messageId')}: {payload}")

                # 3. Success: Acknowledge (Ack) to delete message from queue
                requests.post(meta["ackUrl"], headers=headers)

            except Exception as e:
                print(f"Error processing message: {e}")

                # 4. Failure: Negative Acknowledge (Nack) 
                # This will cause the message to be retried according to your queue policy.
                requests.post(meta["nackUrl"], json={"reason": str(e)}, headers=headers)

                # Note: You can use meta["rejectUrl"] if the failure is permanent 
                # and you don't want to retry (e.g., invalid data format).

        except Exception as e:
            print(f"Worker polling error: {e}")
            time.sleep(5)

if __name__ == "__main__":
    # Option A: Run directly (blocks main thread)
    worker_loop()

    # Option B: Run in a background thread
    # import threading
    # threading.Thread(target=worker_loop, daemon=True).start()
    # while True: time.sleep(1)

Hooque deletes the message immediately upon delivery. Use this for non-critical tasks where "at-least-once" delivery isn't required.

import requests
import time

QUEUE_URL = "https://app.hooque.io/queues/cons_xxxxxxxx"
TOKEN = "your_consumer_token"

def worker_loop():
    headers = {"Authorization": f"Bearer {TOKEN}"}
    next_url = f"{QUEUE_URL}/next"
    params = {"autoAck": "true"}

    print(f"Starting worker (Auto Ack) for {QUEUE_URL}...")

    while True:
        try:
            response = requests.get(next_url, headers=headers, params=params)

            if response.status_code != 200:
                time.sleep(1)
                continue

            payload = response.json()

            # Process your business logic here. 
            # Message is already deleted from the queue.
            print(f"Processing: {payload}")

        except Exception as e:
            print(f"Worker error: {e}")
            time.sleep(5)

if __name__ == "__main__":
    worker_loop()

Async Polling

import asyncio
import httpx
import json

async def async_worker():
    headers = {"Authorization": f"Bearer {TOKEN}"}
    next_url = f"{QUEUE_URL}/next"

    async with httpx.AsyncClient() as client:
        while True:
            try:
                resp = await client.get(next_url, headers=headers)

                if resp.status_code != 200:
                    await asyncio.sleep(1)
                    continue

                payload = resp.json()
                meta = json.loads(resp.headers.get("X-Hooque-Meta", "{}"))

                try:
                    # Process your business logic here
                    print(f"Processing: {payload}")

                    await client.post(meta["ackUrl"])
                except Exception as e:
                    print(f"Processing error: {e}")
                    # Nack will retry the message
                    await client.post(meta["nackUrl"], json={"reason": str(e)})
            except Exception as e:
                print(f"Async error: {e}")
                await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(async_worker())
async def async_worker():
    next_url = f"{QUEUE_URL}/next"
    params = {"autoAck": "true"}

    async with httpx.AsyncClient() as client:
        while True:
            resp = await client.get(next_url, params=params)
            if resp.status_code == 200:
                print(f"Processing: {resp.json()}")
            else:
                await asyncio.sleep(1)

Streaming (SSE)

Real-time message delivery over a persistent connection.

import sseclient
import requests
import json

def start_stream():
    url = f"{QUEUE_URL}/stream"
    headers = {"Authorization": f"Bearer {TOKEN}"}

    response = requests.get(url, headers=headers, stream=True)
    client = sseclient.SSEClient(response)

    for event in client.events():
        if event.event == 'message':
            data = json.loads(event.data)
            payload = data['payload']
            meta = data['meta']

            try:
                # Process your business logic here
                print(f"Received: {payload}")
                requests.post(meta['ackUrl'], headers=headers)
            except Exception as e:
                print(f"Error: {e}")
                # Nack will retry the message
                requests.post(meta['nackUrl'], json={"reason": str(e)}, headers=headers)

if __name__ == "__main__":
    start_stream()
import sseclient
import requests

def start_stream():
    url = f"{QUEUE_URL}/stream?autoAck=true"
    headers = {"Authorization": f"Bearer {TOKEN}"}

    response = requests.get(url, headers=headers, stream=True)
    client = sseclient.SSEClient(response)

    for event in client.events():
        if event.event == 'message':
            data = json.loads(event.data)
            # Process your business logic here
            print(f"Received (Auto Ack): {data['payload']}")

if __name__ == "__main__":
    start_stream()