Documentation Index Fetch the complete documentation index at: https://docs.plugged.in/llms.txt
Use this file to discover all available pages before exploring further.
Python SDK
The official Python SDK for interacting with Plugged.in’s Library API. Features both synchronous and asynchronous clients with comprehensive type hints using Pydantic models.
PyPI Package View on PyPI Registry
GitHub Repository Source code and examples
Installation
Requirements
Python 3.8 or higher
httpx for HTTP client
pydantic for data validation
typing-extensions for Python < 3.11
Quick Start
Synchronous Client
from pluggedinkit import PluggedInClient
# Initialize the client
client = PluggedInClient(
api_key = "your-api-key" ,
# base_url defaults to https://plugged.in
)
# List documents
documents = client.documents.list()
print ( f "Found { documents.total } documents" )
# Search documents
results = client.documents.search( "machine learning" )
for result in results.results:
print ( f " { result.title } - Relevance: { result.relevance_score } " )
# Query knowledge base
answer = client.rag.ask_question( "What are the main features?" )
print (answer)
Asynchronous Client
import asyncio
from pluggedinkit import AsyncPluggedInClient
async def main ():
# Initialize async client
async with AsyncPluggedInClient( api_key = "your-api-key" ) as client:
# List documents
documents = await client.documents.list()
print ( f "Found { documents.total } documents" )
# Query RAG
answer = await client.rag.ask_question( "What's new in the project?" )
print (answer)
asyncio.run(main())
Authentication
Get your API key from your Plugged.in Profile :
import os
from pluggedinkit import PluggedInClient
# Using environment variables (recommended)
client = PluggedInClient(
api_key = os.environ[ "PLUGGEDIN_API_KEY" ],
base_url = os.environ.get( "PLUGGEDIN_BASE_URL" , "https://plugged.in" ),
timeout = 60.0 , # 60 seconds
max_retries = 5 ,
debug = True # Enable debug logging
)
# Update API key at runtime
client.set_api_key( "new-api-key" )
Core Features
Document Management
List Documents
from pluggedinkit.types import DocumentFilters, DocumentSource
filters = DocumentFilters(
source = DocumentSource. AI_GENERATED ,
tags = [ "report" , "analysis" ],
category = "documentation" ,
date_from = "2024-01-01T00:00:00Z" ,
date_to = "2024-12-31T23:59:59Z" ,
model_provider = "anthropic" ,
sort = "date_desc" ,
limit = 20 ,
offset = 0
)
response = client.documents.list(filters)
for doc in response.documents:
print ( f " { doc.title } ( { doc.file_size } bytes)" )
print ( f " Created: { doc.created_at } " )
print ( f " Tags: { ', ' .join(doc.tags) } " )
Get Document
# Get document metadata only
doc = client.documents.get( "document-id" )
print ( f "Title: { doc.title } " )
print ( f "Version: { doc.version } " )
# Get document with content and version history
doc_with_content = client.documents.get(
"document-id" ,
include_content = True ,
include_versions = True
)
print (doc_with_content.content)
# Access version history
for version in doc_with_content.versions:
print ( f "Version { version.number } : { version.created_at } " )
Search Documents
from pluggedinkit.types import SearchFilters
filters = SearchFilters(
model_provider = "anthropic" ,
date_from = "2024-01-01T00:00:00Z" ,
tags = [ "finance" , "q4" ]
)
results = client.documents.search(
"quarterly report" ,
filters = filters,
limit = 10 ,
offset = 0
)
for result in results.results:
print ( f " { result.title } " )
print ( f " Relevance: { result.relevance_score :.2f} " )
print ( f " Snippet: { result.snippet } " )
print ( f " Tags: { ', ' .join(result.tags) } " )
Create AI-Generated Document
metadata = {
"format" : "md" ,
"category" : "documentation" ,
"tags" : [ "api" , "guide" ],
"model" : {
"name" : "claude-3-opus" ,
"provider" : "anthropic" ,
"version" : "20240229"
},
"prompt" : "Create a comprehensive API integration guide" ,
"context" : "User requested comprehensive API documentation" ,
"updateReason" : "Initial creation" ,
"changesFromPrompt" : "Created new guide as requested" ,
"changeSummary" : "New comprehensive API integration guide" ,
"conversationContext" : [
{ "role" : "user" , "content" : "Create an API guide" },
{ "role" : "assistant" , "content" : "I'll create a comprehensive guide..." }
],
"sourceDocuments" : [ "doc-123" , "doc-456" ],
"visibility" : "workspace" ,
"sessionId" : "session-789" ,
"generationParams" : {
"temperature" : 0.7 ,
"maxTokens" : 4000 ,
"topP" : 0.9
},
# Dynamic fields - add any custom metadata
"customField" : "Any custom value" ,
"projectMetadata" : {
"team" : "engineering" ,
"version" : "1.0.0"
}
}
doc = client.documents.create(
title = "API Integration Guide" ,
content = "# API Integration Guide \n\n ## Introduction \n\n ..." ,
metadata = metadata
)
print ( f "Created document: { doc.id } " )
print ( f "Version: { doc.version } " )
# Access all metadata including custom fields
print ( f "All metadata: { doc.ai_metadata. __dict__ } " )
Update Document
from pluggedinkit.types import UpdateDocumentRequest, UpdateOperation
request = UpdateDocumentRequest(
operation = UpdateOperation. APPEND ,
content = " \n\n ## New Section \n\n Additional content here." ,
metadata = {
"changeSummary" : "Added implementation details" ,
"model" : {
"name" : "gpt-4" ,
"provider" : "openai" ,
"version" : "0613"
},
"updateReason" : "Added new section on authentication" ,
"changesFromPrompt" : "User requested additional auth details" ,
"prompt" : "Add a section about OAuth 2.0 authentication" ,
"lastUpdatedBy" : {
"name" : "gpt-4" ,
"provider" : "openai" ,
"version" : "0613"
},
"lastUpdateTimestamp" : "2024-01-15T10:30:00Z" ,
"conversationContext" : [
{ "role" : "user" , "content" : "Add OAuth 2.0 section" },
{ "role" : "assistant" , "content" : "Adding authentication details..." }
],
# Custom metadata fields
"customUpdateMetadata" : {
"reviewedBy" : "security-team" ,
"complianceCheck" : "passed"
}
}
)
response = client.documents.update( "document-id" , request)
print ( f "Updated to version { response.version } " )
print ( f "Updated at: { response.updated_at } " )
RAG Operations
Simple Query
# Basic question answering
answer = client.rag.ask_question( "What are the deployment procedures?" )
print (answer)
# Get storage statistics
stats = client.rag.get_storage_stats( user_id = "user-id-from-dashboard" )
print ( f "Documents: { stats.documents_count } " )
print ( f "Estimated storage: { stats.estimated_storage_mb } MB" )
Query with Sources
result = client.rag.query_with_sources( "Explain the authentication flow" )
print ( f "Answer: { result[ 'answer' ] } " )
print ( " \n Sources:" )
for source in result[ "sources" ]:
print ( f "- { source.name } ( { source.id } )" )
Find Relevant Documents
documents = client.rag.find_relevant_documents(
"user authentication" ,
limit = 5 # Top 5 documents
)
for doc in documents:
print ( f "- { doc.name } ( { doc.id } )" )
File Uploads
Important: Direct file uploads are no longer available via the public API.
The client.uploads helpers now raise descriptive errors so legacy code fails
fast. Use the Plugged.in web interface or upcoming ingestion pipeline for
binary assets. AI-generated documents remain available through
client.documents.create and client.uploads.upload_document.
Async Operations
Concurrent Operations
import asyncio
from pluggedinkit import AsyncPluggedInClient
async def document_operations ():
async with AsyncPluggedInClient( api_key = "your-key" ) as client:
# Concurrent document searches
search_tasks = [
client.documents.search( "api" ),
client.documents.search( "guide" ),
client.documents.search( "tutorial" )
]
results = await asyncio.gather( * search_tasks)
for result in results:
print ( f "Found { result.total } matches" )
Async RAG Queries
async def rag_operations ():
async with AsyncPluggedInClient( api_key = "your-key" ) as client:
# Multiple queries concurrently
questions = [
"What is the authentication process?" ,
"How do I deploy the application?" ,
"What are the API rate limits?"
]
tasks = [client.rag.ask_question(q) for q in questions]
answers = await asyncio.gather( * tasks)
for q, a in zip (questions, answers):
print ( f "Q: { q } " )
print ( f "A: { a } \n " )
asyncio.run(rag_operations())
Error Handling
The SDK provides typed exceptions for better error handling:
from pluggedinkit import (
PluggedInError,
AuthenticationError,
RateLimitError,
NotFoundError,
ValidationError
)
try :
doc = client.documents.get( "invalid-id" )
except AuthenticationError as e:
print ( "Invalid API key - please check your credentials" )
# Refresh API key logic
except RateLimitError as e:
print ( f "Rate limited. Retry after { e.retry_after } seconds" )
# Implement exponential backoff
time.sleep(e.retry_after)
except NotFoundError as e:
print ( f "Document not found: { e.resource_id } " )
except ValidationError as e:
print ( f "Validation error: { e.details } " )
# Fix validation issues
except PluggedInError as e:
print ( f "API error: { e } " )
# Generic error handling
PAP Agents
The SDK provides full support for managing PAP (Plugged.in Agent Protocol) agents:
Creating Agents
result = client.agents.create({
'name' : 'my-agent' ,
'description' : 'My autonomous PAP agent' ,
'resources' : {
'cpu_request' : '100m' ,
'memory_request' : '256Mi' ,
'cpu_limit' : '1000m' ,
'memory_limit' : '1Gi'
}
})
print ( f "Agent created: { result[ 'agent' ][ 'uuid' ] } " )
print ( f "DNS: { result[ 'agent' ][ 'dns_name' ] } " ) # my-agent.is.plugged.in
Listing and Managing Agents
# List all agents
agents = client.agents.list()
# Get agent details
details = client.agents.get(agent_id)
print (details[ 'recentHeartbeats' ])
print (details[ 'recentMetrics' ])
print (details[ 'lifecycleEvents' ])
# Export agent data
export_data = client.agents.export(
agent_id,
include_telemetry = True ,
telemetry_limit = 1000
)
# Delete agent
client.agents.delete(agent_id)
Heartbeats and Metrics
CRITICAL : Heartbeats contain ONLY liveness data. Resource telemetry goes in metrics (separate channel). This separation is PAP’s zombie prevention superpower.
# Send heartbeat (liveness only)
client.agents.heartbeat(
agent_id,
mode = 'IDLE' , # or 'EMERGENCY', 'SLEEP'
uptime_seconds = time.process_time()
)
# Send metrics (resource telemetry)
client.agents.metrics(
agent_id,
cpu_percent = 12.5 ,
memory_mb = 128 ,
requests_handled = 45 ,
custom_metrics = {
'queue_depth' : 3 ,
'cache_hit_rate' : 0.85
}
)
Async Agent Operations
from pluggedinkit import AsyncPluggedInClient
async with AsyncPluggedInClient( api_key = "your-key" ) as client:
# Create agent
result = await client.agents.create({
'name' : 'async-agent' ,
'description' : 'Async PAP agent'
})
# Send heartbeat and metrics concurrently
await asyncio.gather(
client.agents.heartbeat(agent_id, mode = 'IDLE' , uptime_seconds = 330 ),
client.agents.metrics(agent_id, cpu_percent = 10 , memory_mb = 100 , requests_handled = 20 )
)
Agent States
Agents follow the PAP lifecycle state machine:
NEW → PROVISIONED → ACTIVE ↔ DRAINING → TERMINATED
↓ (error)
KILLED
# Poll until agent is active
import time
while True :
details = client.agents.get(agent_id)
if details[ 'agent' ][ 'state' ] == 'ACTIVE' :
print ( 'Agent is ready!' )
break
time.sleep( 5 )
See PAP Agents Documentation for complete details on the Plugged.in Agent Protocol.
Type Safety with Pydantic
The SDK uses Pydantic models for comprehensive type safety:
from pluggedinkit.types import (
Document,
DocumentFilters,
DocumentSource,
DocumentVisibility,
DocumentCategory,
UpdateOperation,
ModelInfo
)
# All types are validated
filters = DocumentFilters(
source = DocumentSource. AI_GENERATED ,
limit = 10 # Validated: must be > 0 and <= 100
)
# IDE autocomplete and type checking
doc: Document = client.documents.get( "id" )
print (doc.title) # Type-safe attribute access
# Pydantic validation
try :
invalid_filters = DocumentFilters(
limit =- 1 # Will raise ValidationError
)
except ValidationError as e:
print ( f "Validation failed: { e } " )
Advanced Configuration
Custom HTTP Client
import httpx
from pluggedinkit import PluggedInClient
# Custom httpx client with proxy
http_client = httpx.Client(
proxies = "http://proxy.example.com:8080" ,
verify = False , # Disable SSL verification (not recommended)
timeout = httpx.Timeout( 30.0 )
)
client = PluggedInClient(
api_key = "your-api-key" ,
base_url = "https://plugged.in" ,
http_client = http_client
)
Retry Configuration
client = PluggedInClient(
api_key = "your-api-key" ,
max_retries = 10 ,
retry_delay = 1.0 , # Initial delay in seconds
retry_backoff = 2.0 # Exponential backoff factor
)
Logging Configuration
import logging
# Enable debug logging
logging.basicConfig( level = logging. DEBUG )
client = PluggedInClient(
api_key = "your-api-key" ,
debug = True
)
# SDK will log requests, responses, and errors
Environment Variables
Store your API key securely using environment variables:
# .env file
PLUGGEDIN_API_KEY = your-api-key
PLUGGEDIN_BASE_URL = https://plugged.in
import os
from dotenv import load_dotenv
from pluggedinkit import PluggedInClient
load_dotenv()
client = PluggedInClient(
api_key = os.environ[ "PLUGGEDIN_API_KEY" ],
base_url = os.environ.get( "PLUGGEDIN_BASE_URL" )
)
Rate Limiting
The SDK automatically handles rate limiting with exponential backoff:
API Endpoints : 60 requests per minute
Document Search : 10 requests per hour for AI documents
RAG Queries : Subject to plan limits
# The SDK will automatically retry with backoff
for i in range ( 100 ):
try :
doc = client.documents.get( f "doc- { i } " )
except RateLimitError as e:
print ( f "Rate limited, waiting { e.retry_after } seconds" )
time.sleep(e.retry_after)
Testing
Unit Testing with Mock
import unittest
from unittest.mock import Mock, patch
from pluggedinkit import PluggedInClient
class TestDocuments ( unittest . TestCase ):
def setUp ( self ):
self .client = PluggedInClient( api_key = "test-key" )
@patch ( 'pluggedinkit.client.httpx.Client' )
def test_list_documents ( self , mock_http ):
# Mock response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"documents" : [],
"total" : 0
}
mock_http.return_value.request.return_value = mock_response
# Test
result = self .client.documents.list()
self .assertEqual(result.total, 0 )
Integration Testing
import pytest
from pluggedinkit import PluggedInClient
@pytest.fixture
def client ():
return PluggedInClient( api_key = "test-api-key" )
def test_document_lifecycle ( client ):
# Create document
doc = client.documents.create(
"Test Document" ,
"Test content" ,
{ "tags" : [ "test" ]}
)
assert doc.id is not None
# Update document
update_result = client.documents.update(
doc.id,
UpdateDocumentRequest(
operation = UpdateOperation. APPEND ,
content = " \n Updated content"
)
)
assert update_result.version > doc.version
# Delete document
client.documents.delete(doc.id)
Examples
Complete working examples are available in the GitHub repository :
API Reference
Client Classes
Class Description PluggedInClientSynchronous API client AsyncPluggedInClientAsynchronous API client
Configuration Options
Parameter Type Default Description api_keystr required Your Plugged.in API key base_urlstr https://plugged.inAPI base URL timeoutfloat 60.0Request timeout in seconds max_retriesint 5Maximum retry attempts debugbool FalseEnable debug logging
Document Methods
Method Description list(filters)List documents with optional filters get(id, include_content, include_versions)Get a specific document search(query, filters, limit, offset)Search documents semantically create(title, content, metadata)Create a new document update(id, request)Update an existing document delete(id)Delete a document
RAG Methods
Method Description query(query)Returns RagResponse with answer and metadata ask_question(query)Simple RAG query (answer text only) query_with_sources(query)Query with structured source metadata find_relevant_documents(query, limit)Find relevant documents check_availability()Check RAG service availability get_storage_stats(user_id)Get storage statistics
Upload Methods
Upload helpers now raise descriptive errors because binary ingestion is no longer available via the public API.
Clipboard Methods
Method Description list()List all clipboard entries get(name, idx)Get entry by name or index set(...)Create or update named entry push(...)Push value to indexed stack pop()Pop and remove most recent entry delete(name, idx)Delete specific entry clear_all()Delete all entries
Clipboard (Memory) Service
The Clipboard service provides persistent key-value storage for MCP tools and AI agents. Both sync and async clients are supported.
Named Access (Sync)
# Set a named entry
entry = client.clipboard.set(
name = "user_preferences" ,
value = '{"theme": "dark", "lang": "en"}' ,
content_type = "application/json" ,
encoding = "utf-8" ,
visibility = "private" ,
ttl_seconds = 86400 # 24 hours
)
print ( f "Created entry: { entry.uuid } " )
# Get by name
entry = client.clipboard.get( name = "user_preferences" )
print ( f "Value: { entry.value } " )
# Delete by name
deleted = client.clipboard.delete( name = "user_preferences" )
print ( f "Deleted: { deleted } " )
Named Access (Async)
import asyncio
from pluggedinkit import AsyncPluggedInClient
async def clipboard_operations ():
async with AsyncPluggedInClient( api_key = "your-key" ) as client:
# Set entry
entry = await client.clipboard.set(
name = "session_data" ,
value = "session info" ,
content_type = "text/plain"
)
# Get entry
entry = await client.clipboard.get( name = "session_data" )
print ( f "Session: { entry.value } " )
asyncio.run(clipboard_operations())
Indexed (Stack) Access
# Push to stack
client.clipboard.push(
value = "Step 1 result" ,
content_type = "text/plain"
)
client.clipboard.push(
value = "Step 2 result" ,
content_type = "text/plain"
)
# Get by index (0 = most recent)
latest = client.clipboard.get( idx = 0 )
print ( f "Latest: { latest.value } " ) # "Step 2 result"
# Pop removes and returns the most recent
popped = client.clipboard.pop()
print ( f "Popped: { popped.value } " ) # "Step 2 result"
List All Entries
entries = client.clipboard.list()
for entry in entries:
name_or_idx = entry.name or f "idx: { entry.idx } "
print ( f " { name_or_idx } : { entry.value } " )
if entry.expires_at:
print ( f " Expires: { entry.expires_at } " )
Pydantic Models
from pluggedinkit.types import (
ClipboardEntry,
ClipboardSetRequest,
ClipboardPushRequest,
ClipboardEncoding,
ClipboardVisibility,
ClipboardSource,
DEFAULT_CLIPBOARD_SOURCE
)
# All types are Pydantic models with validation
request = ClipboardSetRequest(
name = "cache_key" ,
value = "cached data" ,
content_type = "text/plain" ,
encoding = ClipboardEncoding. UTF8 ,
visibility = ClipboardVisibility. PRIVATE ,
ttl_seconds = 3600
)
# ClipboardSource tracks where entries originate
class ClipboardSource ( str , Enum ):
UI = "ui" # Created via web interface
SDK = "sdk" # Created via SDK (automatically set)
MCP = "mcp" # Created via MCP tools
# The SDK automatically sets source to ClipboardSource.SDK for all entries.
# DEFAULT_CLIPBOARD_SOURCE is ClipboardSource.UI for backward compatibility.
Support