Introduction to Websocket Streaming with RunPod Serverless

Introduction to Websocket Streaming with RunPod Serverless

In this followup to our 'Hello World' tutorial, we'll create a serverless endpoint that processes base64-encoded files and streams back the results. This will demonstrate how you can work with file input/output over our serverless environment by encoding the file as data within a JSON payload.

Prerequisites

As before, this tutorial will be aimed at OSX developers.

Step 1: Creating the Project

Create your development environment.

mkdir runpod-base64-stream
cd runpod-base64-stream
python -m venv venv
source venv/bin/activate
pip install runpod Pillow

Step 2: Creating the Handler

Create a new file called handler.py. Remember that the handler loop is how code gets executed when a worker is active. In this example, this handler will simulate image processing. Since this tutorial is to demonstrate the serverless environment more than process images, we will have it just create a static, blank image as the payload.

import runpod
import base64
import io
from PIL import Image
import time

def process_image_chunk(chunk_data, chunk_number, total_chunks):
    """
    Simulate processing a chunk of image data.
    In a real application, you might do actual image processing here.
    """
    return {
        "chunk_number": chunk_number,
        "chunk_size": len(chunk_data),
        "processed_at": time.strftime("%H:%M:%S")
    }

def generator_handler(job):
    """
    Handler that processes a base64 encoded image in chunks and streams results.
    """
    job_input = job["input"]
    
    # Get the base64 string from input
    base64_string = job_input.get("base64_image")
    if not base64_string:
        yield {"error": "No base64_image provided in input"}
        return

    try:
        # Decode base64 string
        image_data = base64.b64decode(base64_string)
        
        # Open image to validate and get info
        image = Image.open(io.BytesIO(image_data))
        
        # Get image info for initial metadata
        yield {
            "status": "started",
            "image_info": {
                "format": image.format,
                "size": image.size,
                "mode": image.mode
            }
        }

        # Simulate processing image in chunks
        # In a real application, you might process different parts of the image
        chunk_size = len(image_data) // 4  # Process in 4 chunks
        total_chunks = (len(image_data) + chunk_size - 1) // chunk_size

        for i in range(total_chunks):
            start_idx = i * chunk_size
            end_idx = min(start_idx + chunk_size, len(image_data))
            chunk = image_data[start_idx:end_idx]

            # Process this chunk
            result = process_image_chunk(chunk, i + 1, total_chunks)
            
            # Add progress information
            result["progress"] = f"{i + 1}/{total_chunks}"
            result["percent_complete"] = ((i + 1) / total_chunks) * 100
            
            # Stream the result for this chunk
            yield result
            
            # Simulate processing time
            time.sleep(1)

        # Send final completion message
        yield {
            "status": "completed",
            "total_chunks_processed": total_chunks,
            "final_timestamp": time.strftime("%H:%M:%S")
        }

    except Exception as e:
        yield {"error": str(e)}

# Start the serverless function with streaming enabled
runpod.serverless.start({
    "handler": generator_handler,
    "return_aggregate_stream": True
})

Step 3: Creating the Dockerfile and requirements.txt

As with the previous tutorial, we'll need to provide the Dockerfile and requirements.txt to build and push the image.

FROM python:3.9-slim

WORKDIR /app

# Install system dependencies for Pillow
RUN apt-get update && apt-get install -y \
    libjpeg-dev \
    zlib1g-dev \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY handler.py .

CMD [ "python", "-u", "handler.py" ]
runpod==1.3.0
Pillow==9.5.0

Step 4: Build and Push to DockerHub

As before, build and push your image to DockerHub, and then pull it into your endpoint.

docker build --platform linux/amd64 -t your-dockerhub-username/runpod-base64-stream:latest .
docker push your-dockerhub-username/runpod-base64-stream:latest

Step 5: Running the Endpoint in Code

Here, we'll provide an example of how to interact with the endpoint in code. You'll need to provide your RunPod API key and Endpoint ID in the variables up top. Let's call this test_endpoint.py.

import requests
import json
import time
import base64
from PIL import Image
import io
import os

API_KEY = "#INSERT_RUNPOD_API_KEY HERE"
ENDPOINT_ID = "#INSERT_RUNPOD_ENDPOINT_ID HERE"

# Set up the output directory for saving images
# os.getcwd() gets the current working directory (where this script is running)
# os.path.join combines paths in a way that works on all operating systems
OUTPUT_DIR = os.path.join(os.getcwd(), "output_images")

# Create the output directory if it doesn't exist
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)
    print(f"Created output directory: {OUTPUT_DIR}")

def create_test_image():
    """
    Creates a test image and converts it to base64 format.
    
    Returns:
        str: The image encoded as a base64 string
    """
    # Create a new 100x100 pixel image with a red background
    img = Image.new('RGB', (100, 100), color='red')
    
    # Create a bytes buffer to hold the image data
    img_byte_arr = io.BytesIO()
    
    # Save the image to the buffer in PNG format
    img.save(img_byte_arr, format='PNG')
    
    # Get the byte data from the buffer
    img_byte_arr = img_byte_arr.getvalue()
    
    # Save a copy of the input image to disk
    input_path = os.path.join(OUTPUT_DIR, 'test_image_input.png')
    img.save(input_path)
    print(f"Saved input test image as: {input_path}")
    
    # Convert the image bytes to base64 string and return it
    return base64.b64encode(img_byte_arr).decode('utf-8')

def save_base64_image(base64_string, filename):
    """
    Converts a base64 string back to an image and saves it to disk.
    
    Args:
        base64_string (str): The image data as a base64 string
        filename (str): The name to give the saved file
    """
    try:
        # Create the full path where the file will be saved
        output_path = os.path.join(OUTPUT_DIR, filename)
        
        # Convert the base64 string back to bytes
        image_data = base64.b64decode(base64_string)
        
        # Create an image from the bytes
        image = Image.open(io.BytesIO(image_data))
        
        # Save the image as a PNG file
        image.save(output_path, 'PNG')
        print(f"Saved processed image as: {output_path}")
        
        return True
    except Exception as e:
        print(f"Error saving image: {str(e)}")
        return False

# Set up the API endpoint URLs
run_url = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/run"

# Set up the headers for the API request
# The Authorization header is required for authentication
# Content-Type tells the API we're sending JSON data
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# Print a redacted version of the authorization header for debugging
print("Using Authorization header:", headers["Authorization"][:10] + "...")

# Create the test image and get its base64 representation
base64_image = create_test_image()

# Create the payload (data) for our API request
# The structure matches what the RunPod handler expects
payload = {
    "input": {
        "base64_image": base64_image
    }
}

# Send the initial request to start the job
print("\nSending request to:", run_url)
response = requests.post(run_url, headers=headers, json=payload)

# Print debug information about the response
print("Status Code:", response.status_code)
print("Response Headers:", response.headers)
print("Raw Response:", response.text)

# Check for authentication errors
if response.status_code == 401:
    print("\nAuthentication Error: Please check your API key")
    exit()

# Main try-except block for handling the API interaction
try:
    # Parse the JSON response
    job_status = response.json()
    job_id = job_status["id"]
    print(f"\nStarted job: {job_id}")

    # Set up the streaming URL for getting results
    stream_url = f"https://api.runpod.ai/v2/{ENDPOINT_ID}/stream/{job_id}"

    # Keep checking for results until the job is done
    while True:
        # Get the current status of the job
        stream_response = requests.get(stream_url, headers=headers)
        stream_data = stream_response.json()
        
        # Check if the job is completed
        if stream_data["status"] == "COMPLETED":
            print("\nJob completed!")
            break
        
        # Check if the job is still running and has new data
        elif stream_data["status"] == "IN_PROGRESS" and stream_data.get("stream"):
            # Process each piece of output data
            for output in stream_data["stream"]:
                print(f"Received: {json.dumps(output, indent=2)}")
                
                # If we received a processed image, save it
                if "processed_image" in output:
                    filename = f"output_image_{output.get('chunk_number', 'final')}.png"
                    save_base64_image(output["processed_image"], filename)
        
        # Check if the job failed
        elif stream_data["status"] == "FAILED":
            print("\nJob failed!")
            print(stream_data.get("error", "No error message provided"))
            break
        
        # Wait a bit before checking again
        time.sleep(0.5)

# Handle various types of errors that might occur
except json.JSONDecodeError as e:
    print("\nError decoding JSON response:", str(e))
except KeyError as e:
    print("\nError accessing response data:", str(e))
    print("Full response:", job_status)
except Exception as e:
    print("\nUnexpected error:", str(e))

What this code will do is send a request to the endpoint you've created, let it process and return base64 data in a JSON payload, and return it to your script for further local processing and saving.

Run the test:

python test_endpoint.py

You should see output like this, along with a base64 JSON payload saved in the folder you ran the script in.

Started job: 123e4567-e89b-12d3-a456-426614174000
Received: {
  "status": "started",
  "image_info": {
    "format": "PNG",
    "size": [100, 100],
    "mode": "RGB"
  }
}
Received: {
  "chunk_number": 1,
  "chunk_size": 2500,
  "processed_at": "14:30:45",
  "progress": "1/4",
  "percent_complete": 25.0
}
...
Received: {
  "status": "completed",
  "total_chunks_processed": 4,
  "final_timestamp": "14:30:48"
}
Job completed!

Conclusion

You've now learned how to create a RunPod serverless endpoint that can process base64-encoded files and stream results back to the client. This pattern can be extended to handle various types of file processing tasks while providing real-time feedback to users.