Building Instructions
# Agent Orchestration Hub - Build Instructions
## System Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ Agent Orchestration Hub │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Connector A │ │ Connector B │ │ Connector N │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │ Prompt │ │ │ │ Prompt │ │ │ │ Prompt │ │ │
│ │ │Schedule │ │ │ │Schedule │ │ │ │Schedule │ │ │
│ │ │Claude │ │ │ │Claude │ │ │ │Claude │ │ │
│ │ │Executor │ │ │ │Executor │ │ │ │Executor │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Core Database Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DB Manager │ │ Scheduler │ │ Git Backup │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
## Core Components
### 1. Database Schema (`src/db/schema.sql`)
```sql
-- Connectors registry
CREATE TABLE connectors (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
prompt_template TEXT NOT NULL,
schedule_cron TEXT,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Execution history
CREATE TABLE executions (
id TEXT PRIMARY KEY,
connector_id TEXT NOT NULL,
status TEXT CHECK (status IN ('pending', 'running', 'success', 'failed')),
input_data TEXT,
output_data TEXT,
error_message TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
FOREIGN KEY (connector_id) REFERENCES connectors(id)
);
-- System configuration
CREATE TABLE config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
### 2. Database Manager (`src/db/manager.py`)
```python
"""Database management with git backup and initialization."""
import sqlite3
import subprocess
import threading
import time
from pathlib import Path
from typing import Optional, Dict, Any
from contextlib import contextmanager
class DatabaseManager:
def __init__(self, db_path: Path, git_repo_path: Path):
self.db_path = db_path
self.git_repo_path = git_repo_path
self.backup_interval = 300 # 5 minutes
self._backup_thread = None
self._shutdown = threading.Event()
def initialize(self):
"""Initialize database and start backup service."""
self._create_database()
self._init_git_repo()
self._start_backup_service()
def _create_database(self):
"""Create database with schema."""
schema_path = Path(__file__).parent / "schema.sql"
with open(schema_path) as f:
schema = f.read()
with self.get_connection() as conn:
conn.executescript(schema)
@contextmanager
def get_connection(self):
"""Get database connection with automatic cleanup."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def _init_git_repo(self):
"""Initialize git repository for backups."""
if not (self.git_repo_path / ".git").exists():
subprocess.run(["git", "init"], cwd=self.git_repo_path, check=True)
subprocess.run(["git", "config", "user.email", "system@localhost"],
cwd=self.git_repo_path, check=True)
subprocess.run(["git", "config", "user.name", "Agent System"],
cwd=self.git_repo_path, check=True)
def _backup_to_git(self):
"""Backup database to git."""
try:
# Copy database to git repo
subprocess.run(["cp", str(self.db_path), str(self.git_repo_path / "backup.db")],
check=True)
# Commit changes
subprocess.run(["git", "add", "."], cwd=self.git_repo_path, check=True)
result = subprocess.run(["git", "commit", "-m", f"Auto backup {time.time()}"],
cwd=self.git_repo_path, capture_output=True)
# Ignore exit code 1 (no changes to commit)
except subprocess.CalledProcessError as e:
print(f"Backup failed: {e}")
def _start_backup_service(self):
"""Start background backup service."""
def backup_loop():
while not self._shutdown.wait(self.backup_interval):
self._backup_to_git()
self._backup_thread = threading.Thread(target=backup_loop, daemon=True)
self._backup_thread.start()
def shutdown(self):
"""Graceful shutdown."""
self._shutdown.set()
if self._backup_thread:
self._backup_thread.join(timeout=5)
self._backup_to_git() # Final backup
```
### 3. Scheduler (`src/scheduler/manager.py`)
```python
"""Task scheduler with cron support."""
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from typing import Callable, Dict, Any
import logging
class SchedulerManager:
def __init__(self, db_manager):
self.db_manager = db_manager
self.scheduler = BackgroundScheduler()
self.logger = logging.getLogger(__name__)
def start(self):
"""Start the scheduler."""
self.scheduler.start()
self._load_scheduled_connectors()
def stop(self):
"""Stop the scheduler."""
self.scheduler.shutdown()
def add_connector_job(self, connector_id: str, cron_expression: str,
executor_func: Callable):
"""Add a scheduled job for a connector."""
try:
trigger = CronTrigger.from_crontab(cron_expression)
self.scheduler.add_job(
func=executor_func,
trigger=trigger,
id=f"connector_{connector_id}",
args=[connector_id],
replace_existing=True
)
self.logger.info(f"Scheduled connector {connector_id} with cron: {cron_expression}")
except Exception as e:
self.logger.error(f"Failed to schedule connector {connector_id}: {e}")
def remove_connector_job(self, connector_id: str):
"""Remove a scheduled job."""
try:
self.scheduler.remove_job(f"connector_{connector_id}")
except Exception as e:
self.logger.warning(f"Failed to remove job for connector {connector_id}: {e}")
def _load_scheduled_connectors(self):
"""Load and schedule all active connectors from database."""
with self.db_manager.get_connection() as conn:
connectors = conn.execute("""
SELECT id, schedule_cron
FROM connectors
WHERE is_active = 1 AND schedule_cron IS NOT NULL
""").fetchall()
from ..connectors.executor import ConnectorExecutor
executor = ConnectorExecutor(self.db_manager)
for connector in connectors:
self.add_connector_job(
connector['id'],
connector['schedule_cron'],
executor.execute_connector
)
```
### 4. Connector Base (`src/connectors/base.py`)
```python
"""Base connector interface."""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import uuid
from datetime import datetime
class BaseConnector(ABC):
def __init__(self, connector_id: str, name: str, description: str,
prompt_template: str, schedule_cron: Optional[str] = None):
self.id = connector_id
self.name = name
self.description = description
self.prompt_template = prompt_template
self.schedule_cron = schedule_cron
@abstractmethod
def prepare_prompt(self, input_data: Dict[str, Any]) -> str:
"""Prepare the prompt with input data."""
pass
@abstractmethod
def process_output(self, claude_output: str) -> Dict[str, Any]:
"""Process Claude's output."""
pass
def to_dict(self) -> Dict[str, Any]:
"""Convert connector to dictionary for database storage."""
return {
'id': self.id,
'name': self.name,
'description': self.description,
'prompt_template': self.prompt_template,
'schedule_cron': self.schedule_cron,
'is_active': True
}
```
### 5. Claude Code Executor (`src/connectors/claude_executor.py`)
```python
"""Claude Code CLI interface."""
import subprocess
import tempfile
import json
from pathlib import Path
from typing import Dict, Any, Optional
class ClaudeCodeExecutor:
def __init__(self, claude_code_path: str = "claude-code"):
self.claude_code_path = claude_code_path
def execute_prompt(self, prompt: str, context_files: Optional[Dict[str, str]] = None) -> str:
"""Execute prompt using Claude Code CLI."""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Write context files if provided
if context_files:
for filename, content in context_files.items():
(temp_path / filename).write_text(content)
# Write prompt to file
prompt_file = temp_path / "prompt.txt"
prompt_file.write_text(prompt)
try:
# Execute Claude Code
result = subprocess.run([
self.claude_code_path,
"--prompt-file", str(prompt_file),
"--output-format", "json"
],
capture_output=True,
text=True,
cwd=temp_dir,
timeout=300 # 5 minute timeout
)
if result.returncode != 0:
raise RuntimeError(f"Claude Code failed: {result.stderr}")
return result.stdout.strip()
except subprocess.TimeoutExpired:
raise RuntimeError("Claude Code execution timed out")
except Exception as e:
raise RuntimeError(f"Claude Code execution failed: {e}")
```
### 6. Connector Executor (`src/connectors/executor.py`)
```python
"""Connector execution engine."""
import uuid
import json
from datetime import datetime
from typing import Dict, Any, Optional
from .claude_executor import ClaudeCodeExecutor
from .registry import ConnectorRegistry
class ConnectorExecutor:
def __init__(self, db_manager):
self.db_manager = db_manager
self.claude_executor = ClaudeCodeExecutor()
self.registry = ConnectorRegistry(db_manager)
def execute_connector(self, connector_id: str, input_data: Optional[Dict[str, Any]] = None) -> str:
"""Execute a connector by ID."""
execution_id = str(uuid.uuid4())
try:
# Record execution start
self._record_execution_start(execution_id, connector_id, input_data)
# Get connector
connector = self.registry.get_connector(connector_id)
if not connector:
raise ValueError(f"Connector {connector_id} not found")
# Prepare prompt
prompt = connector.prepare_prompt(input_data or {})
# Execute with Claude Code
claude_output = self.claude_executor.execute_prompt(prompt)
# Process output
processed_output = connector.process_output(claude_output)
# Record success
self._record_execution_complete(execution_id, 'success', processed_output)
return execution_id
except Exception as e:
# Record failure
self._record_execution_complete(execution_id, 'failed', None, str(e))
raise
def _record_execution_start(self, execution_id: str, connector_id: str,
input_data: Optional[Dict[str, Any]]):
"""Record execution start in database."""
with self.db_manager.get_connection() as conn:
conn.execute("""
INSERT INTO executions (id, connector_id, status, input_data, started_at)
VALUES (?, ?, 'running', ?, ?)
""", (execution_id, connector_id, json.dumps(input_data), datetime.utcnow()))
def _record_execution_complete(self, execution_id: str, status: str,
output_data: Optional[Dict[str, Any]],
error_message: Optional[str] = None):
"""Record execution completion in database."""
with self.db_manager.get_connection() as conn:
conn.execute("""
UPDATE executions
SET status = ?, output_data = ?, error_message = ?, completed_at = ?
WHERE id = ?
""", (status, json.dumps(output_data), error_message, datetime.utcnow(), execution_id))
```
### 7. System Main (`src/main.py`)
```python
"""Main system entry point."""
import signal
import sys
from pathlib import Path
from .db.manager import DatabaseManager
from .scheduler.manager import SchedulerManager
from .connectors.executor import ConnectorExecutor
import logging
def main():
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize paths
data_dir = Path("./data")
data_dir.mkdir(exist_ok=True)
db_path = data_dir / "agents.db"
git_backup_path = data_dir / "backups"
git_backup_path.mkdir(exist_ok=True)
# Initialize components
db_manager = DatabaseManager(db_path, git_backup_path)
scheduler_manager = SchedulerManager(db_manager)
try:
# Start system
logger.info("Starting Agent Orchestration Hub...")
db_manager.initialize()
scheduler_manager.start()
logger.info("System started successfully")
# Graceful shutdown handler
def signal_handler(signum, frame):
logger.info("Shutting down...")
scheduler_manager.stop()
db_manager.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Keep alive
signal.pause()
except Exception as e:
logger.error(f"System startup failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
```
## Testing Framework
### Test Structure (`tests/`)
```
tests/
├── unit/
│ ├── test_db_manager.py
│ ├── test_scheduler.py
│ ├── test_connectors.py
│ └── test_claude_executor.py
├── integration/
│ ├── test_full_workflow.py
│ └── test_system_startup.py
└── fixtures/
├── sample_connectors.json
└── test_data.sql
```
### Example Test (`tests/unit/test_db_manager.py`)
```python
"""Test database manager."""
import pytest
import tempfile
from pathlib import Path
from src.db.manager import DatabaseManager
@pytest.fixture
def temp_db():
with tempfile.TemporaryDirectory() as temp_dir:
db_path = Path(temp_dir) / "test.db"
git_path = Path(temp_dir) / "git"
git_path.mkdir()
yield DatabaseManager(db_path, git_path)
def test_database_initialization(temp_db):
"""Test database creates properly."""
temp_db.initialize()
with temp_db.get_connection() as conn:
# Test tables exist
tables = conn.execute("""
SELECT name FROM sqlite_master WHERE type='table'
""").fetchall()
table_names = [t['name'] for t in tables]
assert 'connectors' in table_names
assert 'executions' in table_names
assert 'config' in table_names
def test_git_backup(temp_db):
"""Test git backup functionality."""
temp_db.initialize()
temp_db._backup_to_git()
# Check git repo was created
assert (temp_db.git_repo_path / ".git").exists()
assert (temp_db.git_repo_path / "backup.db").exists()
```
## Installation & Setup
### 1. System Dependencies
```bash
# Ubuntu server setup
sudo apt update
sudo apt install -y python3.11 python3.11-venv git sqlite3
# Create project directory
mkdir -p /opt/agent-hub
cd /opt/agent-hub
# Create virtual environment
python3.11 -m venv venv
source venv/bin/activate
# Install Claude Code
curl -sSL https://install.claude.com | bash
```
### 2. Python Dependencies (`requirements.txt`)
```
APScheduler==3.10.4
pytest==7.4.3
pytest-asyncio==0.21.1
```
### 3. System Service (`systemd/agent-hub.service`)
```ini
[Unit]
Description=Agent Orchestration Hub
After=network.target
[Service]
Type=simple
User=agent-hub
WorkingDirectory=/opt/agent-hub
Environment=PATH=/opt/agent-hub/venv/bin
ExecStart=/opt/agent-hub/venv/bin/python -m src.main
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
```
## Build & Deploy
```bash
# 1. Clone and setup
git clone <repo> /opt/agent-hub
cd /opt/agent-hub
python3.11 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# 2. Run tests
pytest tests/ -v
# 3. Initialize system
python -m src.main --init-only
# 4. Install service
sudo cp systemd/agent-hub.service /etc/systemd/system/
sudo systemctl enable agent-hub
sudo systemctl start agent-hub
# 5. Verify
sudo systemctl status agent-hub
```
## Extension Points
1. **New Connector Types**: Extend `BaseConnector`
2. **Multiple LLM Support**: Extend `ClaudeCodeExecutor`
3. **Web Interface**: Add FastAPI REST endpoints
4. **Monitoring**: Add Prometheus metrics
5. **Distributed**: Add Redis/RabbitMQ message queue