Getting Started
Overview
Nightline is a Python library designed to simplify event streaming and message processing, with a focus on making AWS SQS (and others) event handling straightforward and efficient.
Installation
Install Nightline using pip:
Quick Start
Basic Usage
Here's a simple example of how to use Nightline to listen to an SQS queue and process messages:
from nightline.services.sqs import AWSSQSEventStreamListener
from pydantic import BaseModel
# Define your message model
class OrderMessage(BaseModel):
order_id: int
total: float
items: list[str]
# Create a listener for your SQS queue
listener = AWSSQSEventStreamListener(queue_url="https://your_queue_url")
# Define a message processing function
def process_message(message: OrderMessage):
print(f"Processing order {message.order_id}")
# Add your message processing logic here
# Start listening for messages
listener.listen(process_message)
Configuration
Nightline allows you to customize the event stream listener with a configuration object:
from nightline.services.sqs import AWSSQSEventStreamListener, EventStreamConfig
# Create a custom configuration
config = EventStreamConfig(
max_workers=4, # Increase concurrent processors
max_messages=20, # Retrieve more messages per batch
wait_time_seconds=30, # Longer polling interval
auto_ack=True # Automatically acknowledge processed messages
)
# Create listener with custom configuration
listener = AWSSQSEventStreamListener(
queue_url="https://your_queue_url",
config=config
)
Configuration Options
The EventStreamConfig allows you to fine-tune your event stream processing:
max_workers: Controls the number of concurrent message processors- Default: 2
-
Increases parallelism for faster message processing
-
max_messages: Limits the number of messages retrieved in a single batch - Default: 10
-
Helps manage memory and processing load
-
wait_time_seconds: Sets the long polling wait time for message retrieval - Default: 20 seconds
-
Reduces API calls and improves responsiveness
-
auto_ack: Determines automatic message acknowledgment - Default: True
- When enabled, successfully processed messages are automatically removed from the queue
Best Practices
- Message Model: Always use a well-defined Pydantic model to validate incoming messages
- Error Handling: Implement robust error handling in your
process_messagefunction - Logging: Add logging to track message processing and potential issues
- Scalability: Adjust
max_workersbased on your processing requirements
Advanced Usage
For more complex scenarios, you can perform custom error handling:
from nightline.services.sqs import AWSSQSEventStreamListener
import logging
def on_error(error: Exception, message: dict):
logging.error(f"Failed to process message: {error}")
# Implement custom error handling logic
listener = AWSSQSEventStreamListener(queue_url="https://your_queue_url")
listener.listen(process_message, error_handler=on_error)
Requirements
- Python 3.10+
- AWS SDK for Python (Boto3)
- Pydantic
Troubleshooting
- Ensure AWS credentials are properly configured
- Check queue URL and permissions
- Verify message format matches your defined model
Contributing
Contributions are welcome! Please check our GitHub repository for guidelines.
License
This package is released under Apache V2.