Python SDK

Official SDKs of Python for integrating BagelPay.

The BagelPay Python SDK is available as an pypi package for seamless integration with your Python projects.

The SDK is also available on GitHub for those who prefer to explore the source code directly.

The API document is here.

๐Ÿ“ฆ Installation Guide

System Requirements

  • Python: 3.11 or higher (recommended), minimum 3.8

  • Package Manager: pip or poetry

# Install latest stable version
pip install bagelpay

# Upgrade to latest version
pip install --upgrade bagelpay

Method 2: Using Poetry

# Add to your project
poetry add bagelpay

Method 3: Install from Source (Development)

# Clone the repository
git clone https://github.com/bagelpay/bagelpay-sdk-python.git

# Create virtual environment
python3 -m venv venv
source venv/bin/activate  # Linux/Mac
# or venv\Scripts\activate  # Windows

# Install SDK in development mode
pip install -e .

# Verify installation
python -c "import bagelpay; print('Installation successful!')"

Verify Installation

import bagelpay
from bagelpay import BagelPayClient

print(f"BagelPay SDK Version: {bagelpay.__version__}")
print(f"Available modules: {dir(bagelpay)}")

# Test basic functionality
try:
    client = BagelPayClient(api_key="test")
    print("โœ… SDK imported successfully")
except Exception as e:
    print(f"โŒ Import failed: {e}")

๐Ÿš€ Quick Start

import random
from bagelpay import BagelPayClient, CheckoutRequest, CreateProductRequest

# 1. Create account at [BagelPay Dashboard](https://dashboard.bagelpay.io)
# 2. Go to "Developer Settings" โ†’ "API Keys"
# 3. Generate new API key for your environment
# 3. Save your test and live keys securely

# 1. Initialize the client
client = BagelPayClient(
    api_key="your-api-key-here"
)

# 2. Create a sample product
product = client.create_product(CreateProductRequest(
    name="Product_name",
    description="Description_of_product",
    price=29.99
    currency="USD",
    billing_type="subscription",
    tax_inclusive=False,
    tax_category="digital_products",
    recurring_interval="daily",
    trial_days=1
    ))

# 3. Get payment URL
print(f"Product URL: {product.product_url}")

# 4. Complete the checkout session
# For testing payments, you could use the test card '4242 4242 4242 4242' with any expiration and CVV.

โš™๏ธ Basic Configuration

Getting API Keys

  1. Sign up: Create account at BagelPay Dashboard

  2. Navigate: Go to "Developer Settings" โ†’ "API Keys"

  3. Create Key: Generate new API key for your environment

  4. Copy: Save your test and live keys securely

  5. Environment: Start with test keys for development

Environment Variables Setup

# Create .env file in your project root
echo "BAGELPAY_API_KEY=your-test-api-key-here" > .env
echo "BAGELPAY_BASE_URL=https://test.bagelpay.io" >> .env
echo "BAGELPAY_TIMEOUT=30" >> .env

# Load environment variables
export $(cat .env | xargs)

# Or use python-dotenv
pip install python-dotenv

Client Initialization Options

from bagelpay import BagelPayClient
import os


# Method 1: Direct parameters
client = BagelPayClient(
    base_url="https://test.bagelpay.io",
    api_key="your-api-key",
    timeout=30,  # Request timeout in seconds
)

# Method 2: Configuration dictionary
config = {
    "base_url": "https://test.bagelpay.io",
    "api_key": os.getenv("BAGELPAY_API_KEY"),
    "timeout": 30
}
client = BagelPayClient(**config)

# Method 4: Context manager (recommended for production)
with BagelPayClient(api_key="your-api-key") as client:
    # Automatically handles connection cleanup
    response = client.list_products()
    print(f"Found {response.total} products")

Environment-Specific Configuration

# Development environment
dev_client = BagelPayClient(
    base_url="https://test.bagelpay.io",
    api_key="test_key_xxx",
    timeout=60
)

# Staging environment
staging_client = BagelPayClient(
    base_url="https://staging.bagelpay.io",
    api_key="staging_key_xxx",
    timeout=30
)

# Production environment
production_client = BagelPayClient(
    base_url="https://api.bagelpay.io",
    api_key="live_key_xxx",
    timeout=15,
)

๐Ÿ“š Beginner Tutorial

Step 1: Create Your First Product

# List all products
try:
    products = client.list_products(pageNum=1, pageSize=50)
    
    print(f"\n๐Ÿ›๏ธ Product Catalog ({products.total} total):")
    
    for product in products.items:
        status = "๐ŸŸข Active" if not product.is_archive else "๐Ÿ”ด Archived"
        print(f"\n{status} {product.name}")
        print(f"   ID: {product.product_id}")
        if product.recurring_interval:
            print(f"   Price: ${product.price} {product.currency}/{product.recurring_interval}")
        else:
            print(f"   Price: ${product.price} {product.currency}")
        print(f"   Type: {product.billing_type}")
        print(f"   Created: {product.created_at}")
    
    # Update a product
    if products.items:
        first_product = products.items[0]
        
        from bagelpay import UpdateProductRequest
        import random
        
        update_request = UpdateProductRequest(
            product_id=first_product.product_id,
            name="New_Product_" + str(random.randint(1000, 9999)),
            description="New_Description_of_product_" + str(random.randint(1000, 9999)),
            price=random.uniform(50.5, 1024.5),
            currency="USD",
            billing_type=random.choice(["subscription", "subscription", "single_payment"]),
            tax_inclusive=False,
            tax_category=random.choice(["digital_products", "saas_services", "ebooks"]),
            recurring_interval=random.choice(["daily", "weekly", "monthly", "3months", "6months"]),
            trial_days=random.choice([0, 1, 7])
        )
        
        updated_product = client.update_product(update_request)
        print(f"\nโœ… Updated product: {updated_product.name}")
        print(f"   New price: ${updated_product.price}")
        
except Exception as e:
    print(f"โŒ Failed to manage products: {e}")

Step 2: Create a Payment Session

from bagelpay import BagelPayClient, CreateProductRequest

# Initialize client
client = BagelPayClient(api_key="your-test-api-key")

# Create a digital product
product_request = CreateProductRequest(
    name="Premium Membership",
    description="Access to all premium features with monthly billing",
    price=29.99,
    currency="USD",
    billing_type="subscription", # subscription or single_payment
    tax_inclusive=True,
    tax_category="digital_products", # digital_products, saas_services or ebooks
    recurring_interval="daily", # daily, weekly, monthly, 3months or 6months
    trial_days=1,
)

try:
    product = client.create_product(product_request)
    print(f"โœ… Product created successfully!")
    print(f"Product ID: {product.product_id}")
    print(f"Product Name: {product.name}")
    print(f"Price: ${product.price} {product.currency}")
except Exception as e:
    print(f"โŒ Failed to create product: {e}")

Step 3: Monitor Transactions

from bagelpay import CheckoutRequest, Customer

# Prepare customer information
customer = Customer(
    email="john.doe@example.com"
)

# Create checkout request
from datetime import datetime

checkout_request = CheckoutRequest(
    product_id=product.product_id,  # Use the product ID from step 1
    request_id=f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
    units="1",
    customer=customer,
    success_url="https://yoursite.com/success",
    metadata={
        "order_id": f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
        "user_id": "user_12345",
        "source": "web_app",
        "campaign": "summer_promotion"
    }
)

try:
    checkout_response = client.create_checkout(checkout_request)
    
    print(f"โœ… Checkout session created!")
    print(f"Payment URL: {checkout_response.checkout_url}")
    print(f"Payment ID: {checkout_response.payment_id}")
    print(f"Product ID: {checkout_response.product_id}")
    print(f"Status: {checkout_response.status}")
    print(f"Expires On: {checkout_response.expires_on}")
    
    # Store payment ID for later reference
    payment_id = checkout_response.payment_id
    
except Exception as e:
    print(f"โŒ Failed to create checkout: {e}")

Step 4: Manage Products

# Get transaction history
try:
    transactions = client.list_transactions(pageNum=1, pageSize=20)
    
    print(f"\n๐Ÿ“Š Transaction Summary:")
    print(f"Total Transactions: {transactions.total}")
    print(f"Showing {len(transactions.items)} transactions on this page")
    print(f"Items per Page: 20")
except Exception as e:
    print(f"โŒ Failed to fetch transactions: {e}")

๐Ÿ”ง API Reference

Product Management API

Creating Products

# One-time payment product
one_time_product = CreateProductRequest(
    name="E-book: Python Programming Guide",
    description="Comprehensive guide to Python programming",
    price=49.99,
    currency="USD",
    billing_type="single_payment",
    tax_inclusive=False,
    tax_category="digital_products",
    recurring_interval="none",
    trial_days=0
)

# Subscription product
subscription_product = CreateProductRequest(
    name="Monthly Pro Plan",
    description="Professional features with monthly billing",
    price=19.99,
    currency="USD",
    billing_type="subscription",
    tax_inclusive=True,
    tax_category="digital_products",
    recurring_interval="monthly",
    trial_days=14
)

Product Operations

# Get product details
product = client.get_product("prod_123456")
print(f"Product: {product.name}")
print(f"Status: {'Active' if not product.is_archive else 'Archived'}")

# Archive product (stop selling but keep records)
archived_product = client.archive_product("prod_123456")
print(f"Product archived: {archived_product.is_archive}")

# Unarchive product
unarchived_product = client.unarchive_product("prod_123456")
print(f"Product restored: {not unarchived_product.is_archive}")

# Bulk product operations
all_products = []
page_num = 1
while True:
    products = client.list_products(pageNum=page_num, pageSize=100)
    all_products.extend(products.items)
    
    if len(products.items) < 100:
        break
    page_num += 1

print(f"Total products loaded: {len(all_products)}")

# Filter products by criteria
active_products = [p for p in all_products if not p.is_archive]
subscription_products = [p for p in all_products if p.billing_type == "subscription"]
expensive_products = [p for p in all_products if p.price > 100]

print(f"Active products: {len(active_products)}")
print(f"Subscription products: {len(subscription_products)}")
print(f"Premium products (>$100): {len(expensive_products)}")

Payment Session API

Advanced Checkout Configuration

# Comprehensive checkout request
from datetime import datetime

advanced_checkout = CheckoutRequest(
    product_id="prod_premium_plan",
    request_id=f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
    units="3",
    customer=Customer(
        email="premium.user@company.com"
    ),
    success_url="https://yoursite.com/success",
    metadata={
        # Campaign tracking
        "order_id": f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
        "campaign_id": "black_friday_2024",
        "discount_code": "SAVE30",
        "affiliate_id": "partner_123",
        
        # User context
        "user_id": "user_789",
        "user_tier": "enterprise",
        "company": "Tech Corp Inc",
        
        # Analytics
        "source": "landing_page",
        "medium": "organic",
        "referrer": "https://google.com",
        "utm_campaign": "product_launch"
    }
)

response = client.create_checkout(advanced_checkout)

# Extract detailed checkout information
print(f"\n๐Ÿ’ณ Checkout Session Created:")
print(f"Payment ID: {response.payment_id}")
print(f"Checkout URL: {response.checkout_url}")
print(f"Product ID: {response.product_id}")
print(f"Status: {response.status}")
print(f"Expires On: {response.expires_on}")
print(f"Success URL: {response.success_url}")

Subscription Management API

# Comprehensive subscription management
def manage_subscriptions(client):
    """Manage customer subscriptions"""
    try:
        subscriptions = client.list_subscriptions(pageNum=1, pageSize=100)
        
        print(f"\n๐Ÿ”„ Subscription Management ({subscriptions.total} total):")
        
        active_subs = []
        cancelled_subs = []
        paused_subs = []
        trialing_subs = []
        
        for subscription in subscriptions.items:
            print(f"\n๐Ÿ“‹ Subscription: {subscription.subscription_id}")
            print(f"   Status: {subscription.status}")
            print(f"   Customer: {subscription.customer}")
            print(f"   Product: {subscription.product_name}")
            print(f"   Next Billing: {subscription.billing_period_end}")
            print(f"   Next Billing Amount: ${subscription.next_billing_amount}")
            
            if subscription.status == "active":
                active_subs.append(subscription)
            elif subscription.status == "canceled":
                cancelled_subs.append(subscription)
            elif subscription.status == "paused":
                paused_subs.append(subscription)
            elif subscription.status == "trialing":
                trialing_subs.append(subscription)

        print(f"\n๐Ÿ“ˆ Subscription Summary:")
        print(f"Active: {len(active_subs)}")
        print(f"Cancelled: {len(cancelled_subs)}")
        print(f"Paused: {len(paused_subs)}")
        print(f"Trialing: {len(trialing_subs)}")
        
        # Calculate MRR (Monthly Recurring Revenue)
        monthly_revenue = sum(
            sub.next_billing_amount for sub in active_subs 
            if sub.recurring_interval == "monthly"
        )
        annual_revenue = sum(
            sub.next_billing_amount / 12 for sub in active_subs 
            if sub.recurring_interval == "yearly"
        )
        total_mrr = monthly_revenue + annual_revenue
        
        print(f"๐Ÿ’ฐ Monthly Recurring Revenue: ${total_mrr:.2f}")
        
        return {
            "active": len(active_subs),
            "cancelled": len(cancelled_subs),
            "paused_subs": len(paused_subs),
            "trialing_subs": len(trialing_subs),
            "mrr": total_mrr
        }
        
    except Exception as e:
        print(f"โŒ Error managing subscriptions: {e}")
        return None

# Cancel subscription with reason
def cancel_subscription_with_reason(client, subscription_id):
    """Cancel subscription with cancellation reason"""
    try:
        # Note: This assumes the SDK supports cancellation reasons
        result = client.cancel_subscription(
            subscription_id,
        )
        print(f"โœ… Subscription {subscription_id} cancelled")
        return result
    except Exception as e:
        print(f"โŒ Failed to cancel subscription: {e}")
        return None

manage_subscriptions(client)
cancel_subscription_with_reason(client, "sub_1966676965965533186")

๐Ÿš€ Webhook Integration

import hmac
import hashlib
import json

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
from pyngrok import ngrok
ngrok.set_auth_token("your_ngrok_key")
WEBHOOK_SECRET = "your_webhook_key"

app = FastAPI()


def verify_webhook_signature(signature_data: bytes, signature: str, secret: str) -> bool:
    """Verify webhook signature for security"""
    expected_signature = hmac.new(
        secret.encode('utf-8'),
        signature_data,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected_signature, signature)


@app.post("/api/webhooks")
async def handle_post(request: Request):
    """Handle BagelPay webhook notifications"""
    payload = await request.body()
    timestamp = request.headers.get('timestamp').encode()
    signature = request.headers.get('Bagelpay-Signature')
    # Combine payload and timestamp
    signature_data = timestamp + ".".encode() + payload

    if not verify_webhook_signature(signature_data, signature, WEBHOOK_SECRET):
        return JSONResponse(status_code=401, content={"error": "Invalid signature"})

    try:
        event = json.loads(payload)
        event_type = event.get('event_type')
        data = event.get('object')

        if event_type == 'checkout.completed':
            # handle checkout completed events
            print(event)
        elif event_type == 'checkout.failed':
            # handle checkout failed events
            print(event)
        elif event_type == 'checkout.cancel':
            # handle checkout cancelled events
            print(event)
        elif event_type == 'subscription.trialing':
            # handle subscription trialing events
            print(event)
        elif event_type == 'subscription.paid':
            # handle subscription paid events
            print(event)
        elif event_type == 'subscription.canceled':
            # handle subscription cancelled events
            print(event)
        elif event_type == 'refund.created':
            # handle refund created events
            print(event)
        else:
            print(f"Unhandled event type: {event_type}")

        return JSONResponse(status_code=200, content={"message": "Success"})
    except Exception as e:
        print(f"Webhook processing error: {e}")
        return JSONResponse(status_code=500, content={"error": "Processing failed"})


if __name__ == "__main__":
    listening_port = "8000"
    public_url = ngrok.connect(
        addr=listening_port,
        proto="http",
        hostname="stunning-crane-direct.ngrok-free.app"
    )
    print(f"ngrok Public URL: {public_url}")
    uvicorn.run(app, host="0.0.0.0", port=int(listening_port))

๐Ÿ“ž Support and Resources

Getting Help

Last updated