Skip to main content

Python SDK

PyPI version Python Support License: MIT The official Python SDK for interacting with Plugged.in’s Library API. Features both synchronous and asynchronous clients with comprehensive type hints using Pydantic models.

Installation

pip install pluggedinkit

Requirements

  • Python 3.8 or higher
  • httpx for HTTP client
  • pydantic for data validation
  • typing-extensions for Python < 3.11

Quick Start

Synchronous Client

from pluggedinkit import PluggedInClient

# Initialize the client
client = PluggedInClient(
    api_key="your-api-key",
    # base_url defaults to https://plugged.in
)

# List documents
documents = client.documents.list()
print(f"Found {documents.total} documents")

# Search documents
results = client.documents.search("machine learning")
for result in results.results:
    print(f"{result.title} - Relevance: {result.relevance_score}")

# Query knowledge base
answer = client.rag.ask_question("What are the main features?")
print(answer)

Asynchronous Client

import asyncio
from pluggedinkit import AsyncPluggedInClient

async def main():
    # Initialize async client
    async with AsyncPluggedInClient(api_key="your-api-key") as client:
        # List documents
        documents = await client.documents.list()
        print(f"Found {documents.total} documents")

        # Query RAG
        answer = await client.rag.ask_question("What's new in the project?")
        print(answer)

asyncio.run(main())

Authentication

Get your API key from your Plugged.in Profile:
import os
from pluggedinkit import PluggedInClient

# Using environment variables (recommended)
client = PluggedInClient(
    api_key=os.environ["PLUGGEDIN_API_KEY"],
    base_url=os.environ.get("PLUGGEDIN_BASE_URL", "https://plugged.in"),
    timeout=60.0,  # 60 seconds
    max_retries=5,
    debug=True  # Enable debug logging
)

# Update API key at runtime
client.set_api_key("new-api-key")

Core Features

Document Management

List Documents

from pluggedinkit.types import DocumentFilters, DocumentSource

filters = DocumentFilters(
    source=DocumentSource.AI_GENERATED,
    tags=["report", "analysis"],
    category="documentation",
    date_from="2024-01-01T00:00:00Z",
    date_to="2024-12-31T23:59:59Z",
    model_provider="anthropic",
    sort="date_desc",
    limit=20,
    offset=0
)

response = client.documents.list(filters)
for doc in response.documents:
    print(f"{doc.title} ({doc.file_size} bytes)")
    print(f"  Created: {doc.created_at}")
    print(f"  Tags: {', '.join(doc.tags)}")

Get Document

# Get document metadata only
doc = client.documents.get("document-id")
print(f"Title: {doc.title}")
print(f"Version: {doc.version}")

# Get document with content and version history
doc_with_content = client.documents.get(
    "document-id",
    include_content=True,
    include_versions=True
)
print(doc_with_content.content)

# Access version history
for version in doc_with_content.versions:
    print(f"Version {version.number}: {version.created_at}")

Search Documents

from pluggedinkit.types import SearchFilters

filters = SearchFilters(
    model_provider="anthropic",
    date_from="2024-01-01T00:00:00Z",
    tags=["finance", "q4"]
)

results = client.documents.search(
    "quarterly report",
    filters=filters,
    limit=10,
    offset=0
)

for result in results.results:
    print(f"{result.title}")
    print(f"  Relevance: {result.relevance_score:.2f}")
    print(f"  Snippet: {result.snippet}")
    print(f"  Tags: {', '.join(result.tags)}")

Create AI-Generated Document

metadata = {
    "format": "md",
    "category": "documentation",
    "tags": ["api", "guide"],
    "model": {
        "name": "claude-3-opus",
        "provider": "anthropic",
        "version": "20240229"
    },
    "prompt": "Create a comprehensive API integration guide",
    "context": "User requested comprehensive API documentation",
    "updateReason": "Initial creation",
    "changesFromPrompt": "Created new guide as requested",
    "changeSummary": "New comprehensive API integration guide",
    "conversationContext": [
        {"role": "user", "content": "Create an API guide"},
        {"role": "assistant", "content": "I'll create a comprehensive guide..."}
    ],
    "sourceDocuments": ["doc-123", "doc-456"],
    "visibility": "workspace",
    "sessionId": "session-789",
    "generationParams": {
        "temperature": 0.7,
        "maxTokens": 4000,
        "topP": 0.9
    },
    # Dynamic fields - add any custom metadata
    "customField": "Any custom value",
    "projectMetadata": {
        "team": "engineering",
        "version": "1.0.0"
    }
}

doc = client.documents.create(
    title="API Integration Guide",
    content="# API Integration Guide\n\n## Introduction\n\n...",
    metadata=metadata
)

print(f"Created document: {doc.id}")
print(f"Version: {doc.version}")
# Access all metadata including custom fields
print(f"All metadata: {doc.ai_metadata.__dict__}")

Update Document

from pluggedinkit.types import UpdateDocumentRequest, UpdateOperation

request = UpdateDocumentRequest(
    operation=UpdateOperation.APPEND,
    content="\n\n## New Section\n\nAdditional content here.",
    metadata={
        "changeSummary": "Added implementation details",
        "model": {
            "name": "gpt-4",
            "provider": "openai",
            "version": "0613"
        },
        "updateReason": "Added new section on authentication",
        "changesFromPrompt": "User requested additional auth details",
        "prompt": "Add a section about OAuth 2.0 authentication",
        "lastUpdatedBy": {
            "name": "gpt-4",
            "provider": "openai",
            "version": "0613"
        },
        "lastUpdateTimestamp": "2024-01-15T10:30:00Z",
        "conversationContext": [
            {"role": "user", "content": "Add OAuth 2.0 section"},
            {"role": "assistant", "content": "Adding authentication details..."}
        ],
        # Custom metadata fields
        "customUpdateMetadata": {
            "reviewedBy": "security-team",
            "complianceCheck": "passed"
        }
    }
)

response = client.documents.update("document-id", request)
print(f"Updated to version {response.version}")
print(f"Updated at: {response.updated_at}")

RAG Operations

Simple Query

# Basic question answering
answer = client.rag.ask_question("What are the deployment procedures?")
print(answer)

# Get storage statistics
stats = client.rag.get_storage_stats(user_id="user-id-from-dashboard")
print(f"Documents: {stats.documents_count}")
print(f"Estimated storage: {stats.estimated_storage_mb} MB")

Query with Sources

result = client.rag.query_with_sources("Explain the authentication flow")

print(f"Answer: {result['answer']}")
print("\nSources:")
for source in result["sources"]:
    print(f"- {source.name} ({source.id})")

Find Relevant Documents

documents = client.rag.find_relevant_documents(
    "user authentication",
    limit=5  # Top 5 documents
)

for doc in documents:
    print(f"- {doc.name} ({doc.id})")

File Uploads

Important: Direct file uploads are no longer available via the public API. The client.uploads helpers now raise descriptive errors so legacy code fails fast. Use the Plugged.in web interface or upcoming ingestion pipeline for binary assets. AI-generated documents remain available through client.documents.create and client.uploads.upload_document.

Async Operations

Concurrent Operations

import asyncio
from pluggedinkit import AsyncPluggedInClient

async def document_operations():
    async with AsyncPluggedInClient(api_key="your-key") as client:
        # Concurrent document searches
        search_tasks = [
            client.documents.search("api"),
            client.documents.search("guide"),
            client.documents.search("tutorial")
        ]
        results = await asyncio.gather(*search_tasks)

        for result in results:
            print(f"Found {result.total} matches")

Async RAG Queries

async def rag_operations():
    async with AsyncPluggedInClient(api_key="your-key") as client:
        # Multiple queries concurrently
        questions = [
            "What is the authentication process?",
            "How do I deploy the application?",
            "What are the API rate limits?"
        ]

        tasks = [client.rag.ask_question(q) for q in questions]
        answers = await asyncio.gather(*tasks)

        for q, a in zip(questions, answers):
            print(f"Q: {q}")
            print(f"A: {a}\n")

asyncio.run(rag_operations())

Error Handling

The SDK provides typed exceptions for better error handling:
from pluggedinkit import (
    PluggedInError,
    AuthenticationError,
    RateLimitError,
    NotFoundError,
    ValidationError
)

try:
    doc = client.documents.get("invalid-id")
except AuthenticationError as e:
    print("Invalid API key - please check your credentials")
    # Refresh API key logic
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after} seconds")
    # Implement exponential backoff
    time.sleep(e.retry_after)
except NotFoundError as e:
    print(f"Document not found: {e.resource_id}")
except ValidationError as e:
    print(f"Validation error: {e.details}")
    # Fix validation issues
except PluggedInError as e:
    print(f"API error: {e}")
    # Generic error handling

PAP Agents

The SDK provides full support for managing PAP (Plugged.in Agent Protocol) agents:

Creating Agents

result = client.agents.create({
    'name': 'my-agent',
    'description': 'My autonomous PAP agent',
    'resources': {
        'cpu_request': '100m',
        'memory_request': '256Mi',
        'cpu_limit': '1000m',
        'memory_limit': '1Gi'
    }
})

print(f"Agent created: {result['agent']['uuid']}")
print(f"DNS: {result['agent']['dns_name']}")  # my-agent.is.plugged.in

Listing and Managing Agents

# List all agents
agents = client.agents.list()

# Get agent details
details = client.agents.get(agent_id)
print(details['recentHeartbeats'])
print(details['recentMetrics'])
print(details['lifecycleEvents'])

# Export agent data
export_data = client.agents.export(
    agent_id,
    include_telemetry=True,
    telemetry_limit=1000
)

# Delete agent
client.agents.delete(agent_id)

Heartbeats and Metrics

CRITICAL: Heartbeats contain ONLY liveness data. Resource telemetry goes in metrics (separate channel). This separation is PAP’s zombie prevention superpower.
# Send heartbeat (liveness only)
client.agents.heartbeat(
    agent_id,
    mode='IDLE',  # or 'EMERGENCY', 'SLEEP'
    uptime_seconds=time.process_time()
)

# Send metrics (resource telemetry)
client.agents.metrics(
    agent_id,
    cpu_percent=12.5,
    memory_mb=128,
    requests_handled=45,
    custom_metrics={
        'queue_depth': 3,
        'cache_hit_rate': 0.85
    }
)

Async Agent Operations

from pluggedinkit import AsyncPluggedInClient

async with AsyncPluggedInClient(api_key="your-key") as client:
    # Create agent
    result = await client.agents.create({
        'name': 'async-agent',
        'description': 'Async PAP agent'
    })

    # Send heartbeat and metrics concurrently
    await asyncio.gather(
        client.agents.heartbeat(agent_id, mode='IDLE', uptime_seconds=330),
        client.agents.metrics(agent_id, cpu_percent=10, memory_mb=100, requests_handled=20)
    )

Agent States

Agents follow the PAP lifecycle state machine:
NEW → PROVISIONED → ACTIVE ↔ DRAINING → TERMINATED
                       ↓ (error)
                     KILLED
# Poll until agent is active
import time

while True:
    details = client.agents.get(agent_id)
    if details['agent']['state'] == 'ACTIVE':
        print('Agent is ready!')
        break
    time.sleep(5)
See PAP Agents Documentation for complete details on the Plugged.in Agent Protocol.

Type Safety with Pydantic

The SDK uses Pydantic models for comprehensive type safety:
from pluggedinkit.types import (
    Document,
    DocumentFilters,
    DocumentSource,
    DocumentVisibility,
    DocumentCategory,
    UpdateOperation,
    ModelInfo
)

# All types are validated
filters = DocumentFilters(
    source=DocumentSource.AI_GENERATED,
    limit=10  # Validated: must be > 0 and <= 100
)

# IDE autocomplete and type checking
doc: Document = client.documents.get("id")
print(doc.title)  # Type-safe attribute access

# Pydantic validation
try:
    invalid_filters = DocumentFilters(
        limit=-1  # Will raise ValidationError
    )
except ValidationError as e:
    print(f"Validation failed: {e}")

Advanced Configuration

Custom HTTP Client

import httpx
from pluggedinkit import PluggedInClient

# Custom httpx client with proxy
http_client = httpx.Client(
    proxies="http://proxy.example.com:8080",
    verify=False,  # Disable SSL verification (not recommended)
    timeout=httpx.Timeout(30.0)
)

client = PluggedInClient(
    api_key="your-api-key",
    base_url="https://plugged.in",
    http_client=http_client
)

Retry Configuration

client = PluggedInClient(
    api_key="your-api-key",
    max_retries=10,
    retry_delay=1.0,  # Initial delay in seconds
    retry_backoff=2.0  # Exponential backoff factor
)

Logging Configuration

import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

client = PluggedInClient(
    api_key="your-api-key",
    debug=True
)

# SDK will log requests, responses, and errors

Environment Variables

Store your API key securely using environment variables:
# .env file
PLUGGEDIN_API_KEY=your-api-key
PLUGGEDIN_BASE_URL=https://plugged.in
import os
from dotenv import load_dotenv
from pluggedinkit import PluggedInClient

load_dotenv()

client = PluggedInClient(
    api_key=os.environ["PLUGGEDIN_API_KEY"],
    base_url=os.environ.get("PLUGGEDIN_BASE_URL")
)

Rate Limiting

The SDK automatically handles rate limiting with exponential backoff:
  • API Endpoints: 60 requests per minute
  • Document Search: 10 requests per hour for AI documents
  • RAG Queries: Subject to plan limits
# The SDK will automatically retry with backoff
for i in range(100):
    try:
        doc = client.documents.get(f"doc-{i}")
    except RateLimitError as e:
        print(f"Rate limited, waiting {e.retry_after} seconds")
        time.sleep(e.retry_after)

Testing

Unit Testing with Mock

import unittest
from unittest.mock import Mock, patch
from pluggedinkit import PluggedInClient

class TestDocuments(unittest.TestCase):
    def setUp(self):
        self.client = PluggedInClient(api_key="test-key")

    @patch('pluggedinkit.client.httpx.Client')
    def test_list_documents(self, mock_http):
        # Mock response
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "documents": [],
            "total": 0
        }
        mock_http.return_value.request.return_value = mock_response

        # Test
        result = self.client.documents.list()
        self.assertEqual(result.total, 0)

Integration Testing

import pytest
from pluggedinkit import PluggedInClient

@pytest.fixture
def client():
    return PluggedInClient(api_key="test-api-key")

def test_document_lifecycle(client):
    # Create document
    doc = client.documents.create(
        "Test Document",
        "Test content",
        {"tags": ["test"]}
    )
    assert doc.id is not None

    # Update document
    update_result = client.documents.update(
        doc.id,
        UpdateDocumentRequest(
            operation=UpdateOperation.APPEND,
            content="\nUpdated content"
        )
    )
    assert update_result.version > doc.version

    # Delete document
    client.documents.delete(doc.id)

Examples

Complete working examples are available in the GitHub repository:

API Reference

Client Classes

ClassDescription
PluggedInClientSynchronous API client
AsyncPluggedInClientAsynchronous API client

Configuration Options

ParameterTypeDefaultDescription
api_keystrrequiredYour Plugged.in API key
base_urlstrhttps://plugged.inAPI base URL
timeoutfloat60.0Request timeout in seconds
max_retriesint5Maximum retry attempts
debugboolFalseEnable debug logging

Document Methods

MethodDescription
list(filters)List documents with optional filters
get(id, include_content, include_versions)Get a specific document
search(query, filters, limit, offset)Search documents semantically
create(title, content, metadata)Create a new document
update(id, request)Update an existing document
delete(id)Delete a document

RAG Methods

MethodDescription
query(query)Returns RagResponse with answer and metadata
ask_question(query)Simple RAG query (answer text only)
query_with_sources(query)Query with structured source metadata
find_relevant_documents(query, limit)Find relevant documents
check_availability()Check RAG service availability
get_storage_stats(user_id)Get storage statistics

Upload Methods

Upload helpers now raise descriptive errors because binary ingestion is no longer available via the public API.

Clipboard Methods

MethodDescription
list()List all clipboard entries
get(name, idx)Get entry by name or index
set(...)Create or update named entry
push(...)Push value to indexed stack
pop()Pop and remove most recent entry
delete(name, idx)Delete specific entry
clear_all()Delete all entries

Clipboard (Memory) Service

The Clipboard service provides persistent key-value storage for MCP tools and AI agents. Both sync and async clients are supported.

Named Access (Sync)

# Set a named entry
entry = client.clipboard.set(
    name="user_preferences",
    value='{"theme": "dark", "lang": "en"}',
    content_type="application/json",
    encoding="utf-8",
    visibility="private",
    ttl_seconds=86400  # 24 hours
)
print(f"Created entry: {entry.uuid}")

# Get by name
entry = client.clipboard.get(name="user_preferences")
print(f"Value: {entry.value}")

# Delete by name
deleted = client.clipboard.delete(name="user_preferences")
print(f"Deleted: {deleted}")

Named Access (Async)

import asyncio
from pluggedinkit import AsyncPluggedInClient

async def clipboard_operations():
    async with AsyncPluggedInClient(api_key="your-key") as client:
        # Set entry
        entry = await client.clipboard.set(
            name="session_data",
            value="session info",
            content_type="text/plain"
        )

        # Get entry
        entry = await client.clipboard.get(name="session_data")
        print(f"Session: {entry.value}")

asyncio.run(clipboard_operations())

Indexed (Stack) Access

# Push to stack
client.clipboard.push(
    value="Step 1 result",
    content_type="text/plain"
)

client.clipboard.push(
    value="Step 2 result",
    content_type="text/plain"
)

# Get by index (0 = most recent)
latest = client.clipboard.get(idx=0)
print(f"Latest: {latest.value}")  # "Step 2 result"

# Pop removes and returns the most recent
popped = client.clipboard.pop()
print(f"Popped: {popped.value}")  # "Step 2 result"

List All Entries

entries = client.clipboard.list()
for entry in entries:
    name_or_idx = entry.name or f"idx:{entry.idx}"
    print(f"{name_or_idx}: {entry.value}")
    if entry.expires_at:
        print(f"  Expires: {entry.expires_at}")

Pydantic Models

from pluggedinkit.types import (
    ClipboardEntry,
    ClipboardSetRequest,
    ClipboardPushRequest,
    ClipboardEncoding,
    ClipboardVisibility,
    ClipboardSource,
    DEFAULT_CLIPBOARD_SOURCE
)

# All types are Pydantic models with validation
request = ClipboardSetRequest(
    name="cache_key",
    value="cached data",
    content_type="text/plain",
    encoding=ClipboardEncoding.UTF8,
    visibility=ClipboardVisibility.PRIVATE,
    ttl_seconds=3600
)

# ClipboardSource tracks where entries originate
class ClipboardSource(str, Enum):
    UI = "ui"      # Created via web interface
    SDK = "sdk"    # Created via SDK (automatically set)
    MCP = "mcp"    # Created via MCP tools

# The SDK automatically sets source to ClipboardSource.SDK for all entries.
# DEFAULT_CLIPBOARD_SOURCE is ClipboardSource.UI for backward compatibility.

Support