Skip to content

LimaCharlie Python SDK Documentation

Table of Contents

  1. Overview
  2. Installation
  3. Authentication
  4. Core Classes
  5. Organization
  6. Sensor Management
  7. Detection and Response Rules
  8. Real-time Data Streaming
  9. Artifacts
  10. Hive Operations
  11. Search (LCQL)
  12. Extensions
  13. Infrastructure as Code
  14. Error Handling
  15. Complete Examples

Overview

The LimaCharlie Python SDK provides a comprehensive interface for interacting with the LimaCharlie SecOps Cloud Platform. This SDK enables programmatic access to all platform features including sensor management, detection and response rules, real-time event streaming, and artifact collection.

Key Features

  • Organization Management: Create, configure, and manage LimaCharlie organizations
  • Sensor Operations: Deploy, monitor, and control endpoint sensors
  • Detection & Response: Create and manage detection rules with automated response actions
  • Real-time Streaming: Receive events, detections, and audit logs in real-time
  • Artifact Management: Collect and manage forensic artifacts
  • LCQL Queries: Execute, validate, and save search queries
  • Hive Storage: Key-value configuration store for rules, secrets, playbooks, and more
  • Infrastructure as Code: Pull/push org configuration as version-controlled YAML files

SDK Version

Current version: 5.0.0

Installation

Requirements

  • Python 3.9 or higher
  • pip package manager

Install via pip

pip install limacharlie

Install from source

git clone https://github.com/refractionPOINT/python-limacharlie.git
cd python-limacharlie
pip install -e ".[dev]"

Dependencies

Core dependencies (automatically installed):

  • requests - HTTP client
  • click - CLI framework
  • pyyaml - YAML parsing
  • jmespath - Data filtering
  • rich - Terminal output formatting
  • cryptography - Encryption

Authentication

Authentication Methods

The LimaCharlie SDK supports multiple authentication methods:

  1. API Key Authentication (Recommended for automation)
  2. OAuth Authentication (For user-based access)
  3. Environment-based Authentication (Using configuration files)

Credential Resolution Order

Credentials are resolved in priority order (highest first):

  1. Explicit parameters passed to Client()
  2. LC_OID, LC_API_KEY, LC_UID environment variables
  3. Named environment from LC_CURRENT_ENV (or 'default')
  4. Default credentials in ~/.limacharlie config file

API Key Authentication

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization

# Direct API key authentication
client = Client(
    oid='YOUR_ORGANIZATION_ID',  # UUID format
    api_key='YOUR_API_KEY'       # UUID format
)
org = Organization(client)

Environment-based Authentication

The SDK can read credentials from environment variables or configuration files:

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization

# Using environment variables
# Set: LC_OID="your-org-id"
# Set: LC_API_KEY="your-api-key"
client = Client()
org = Organization(client)

# Using a specific environment from config file
client = Client(environment='production')
org = Organization(client)

Configuration File Format

Create a file at ~/.limacharlie or specify with LC_CREDS_FILE environment variable:

# Default credentials
oid: "12345678-1234-1234-1234-123456789012"
api_key: "87654321-4321-4321-4321-210987654321"

# Named environments
env:
  production:
    oid: "12345678-1234-1234-1234-123456789012"
    api_key: "87654321-4321-4321-4321-210987654321"

  staging:
    oid: "87654321-4321-4321-4321-210987654321"
    api_key: "12345678-1234-1234-1234-123456789012"

User-based Authentication

from limacharlie.client import Client

# Authenticate as a user instead of organization
client = Client(
    uid='USER_ID',
    api_key='USER_API_KEY'
)

Core Classes

Architecture

limacharlie
├── client.Client              # HTTP client with JWT and retry logic
├── sdk/
│   ├── organization.Organization  # Main entry point for org operations
│   ├── sensor.Sensor             # Individual sensor management
│   ├── hive.Hive                 # Key-value configuration store
│   ├── hive.HiveRecord           # Individual hive record
│   ├── dr_rules.DRRules          # D&R rule management
│   ├── fp_rules.FPRules          # False positive rule management
│   ├── search.Search             # LCQL query execution
│   ├── firehose.Firehose         # Real-time push streaming
│   ├── spout.Spout               # Real-time pull streaming
│   ├── extensions.Extensions     # Extension management
│   ├── artifacts.Artifacts       # Artifact collection/retrieval
│   ├── payloads.Payloads         # Payload management
│   ├── replay.Replay             # Rule replay
│   ├── configs.Configs           # Infrastructure as Code
│   └── ...                       # Additional modules
└── errors                        # Custom exception hierarchy

Client

The Client handles HTTP communication, JWT generation/refresh, retry with exponential backoff, and rate limit awareness.

from limacharlie.client import Client

# Basic usage
client = Client(oid='ORG_ID', api_key='API_KEY')

# As context manager
with Client(oid='ORG_ID', api_key='API_KEY') as client:
    data = client.request("GET", "sensors")

Organization

The Organization class is the primary entry point for interacting with a LimaCharlie organization.

Core Methods

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization

client = Client(oid='ORG_ID', api_key='API_KEY')
org = Organization(client)

# Get organization information
org_info = org.get_info()
# Returns: {'oid': '...', 'name': '...', ...}

# Get organization URLs
urls = org.get_urls()
# Returns: {'webapp': 'https://...', 'api': 'https://...', 'hooks': '...', ...}

# Get usage statistics
stats = org.get_stats()

# Get organization configuration
value = org.get_config('config_name')

# Set organization configuration
org.set_config('config_name', 'new_value')

# Get organization errors
errors = org.get_errors()

# Dismiss an error
org.dismiss_error('component_name')

# Get MITRE ATT&CK coverage report
mitre = org.get_mitre_report()

# Get event schemas
schemas = org.get_schemas(platform='windows')
schema = org.get_schema('NEW_PROCESS')

# Current identity and permissions
identity = org.who_am_i()

User Management

# List organization users
users = org.get_users()
# Returns: ['user1@example.com', 'user2@example.com', ...]

# Add a user
org.add_user('new_user@example.com')

# Remove a user
org.remove_user('user@example.com')

# Get user permissions
permissions = org.get_user_permissions()

# Grant a permission
org.add_user_permission('user@example.com', 'dr.set')

# Revoke a permission
org.remove_user_permission('user@example.com', 'dr.set')

# Set a predefined role (Owner, Administrator, Operator, Viewer, Basic)
org.set_user_role('user@example.com', 'Operator')

Organization Lifecycle

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization

client = Client(uid='USER_ID', api_key='USER_API_KEY')
org = Organization(client)

# List accessible organizations
orgs = org.list_accessible_orgs()

# Check name availability
availability = Organization.check_name(client, 'my-new-org')

# Create a new organization
new_org = Organization.create_org(client, 'my-new-org', location='us')

Sensor Management

The Sensor class provides detailed control over individual sensors.

Listing and Getting Sensors

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization
from limacharlie.sdk.sensor import Sensor

client = Client()
org = Organization(client)

# List all sensors (returns generator of dicts)
for sensor_info in org.list_sensors():
    print(sensor_info['sid'], sensor_info.get('hostname'))

# List with a selector filter
for sensor_info in org.list_sensors(selector='plat == windows'):
    print(sensor_info['sid'])

# Get list of online sensor IDs
online_sids = org.get_online_sensors()
for sid in online_sids:
    print(sid)

# Get a specific sensor object
sensor = Sensor(org, 'SENSOR_ID')

Sensor Properties and Methods

sensor = Sensor(org, 'SENSOR_ID')

# Get full sensor information
info = sensor.get_info()
# Returns: {'hostname': '...', 'plat': ..., 'arch': ..., ...}

# Check platform
if sensor.is_windows:
    print("Windows sensor")
elif sensor.is_linux:
    print("Linux sensor")
elif sensor.is_macos:
    print("macOS sensor")

# Get hostname
hostname = sensor.hostname

# Check if sensor is online
is_online = sensor.is_online()

# Wait for sensor to come online (blocking)
came_online = sensor.wait_online(timeout=300)  # 5 minutes

Sending Tasks to Sensors

# Send a single task (fire-and-forget)
sensor.task('os_processes')

# Send multiple tasks
sensor.task(['os_info', 'os_processes', 'os_services'])

# Task with investigation ID
sensor.task('os_processes', inv_id='investigation-123')

Tag Management

# Get sensor tags
tags = sensor.get_tags()
# Returns: ['production', 'web-server', ...]

# Add a tag
sensor.add_tag('critical')

# Add a tag with TTL (auto-removed after N seconds)
sensor.add_tag('investigating', ttl=3600)

# Remove a tag
sensor.remove_tag('test')

Network Isolation

# Isolate sensor from network
sensor.isolate()

# Re-join sensor to network
sensor.rejoin()

# Check isolation status
is_isolated = sensor.is_isolated()

Sensor Events

import time

# Get historical events for a sensor
end = int(time.time())
start = end - 3600  # 1 hour ago

for event in sensor.get_events(start=start, end=end, event_type='NEW_PROCESS', limit=100):
    print(event)

# Get event overview/timeline
overview = sensor.get_overview(start=start, end=end)

Sensor Lifecycle

# Delete sensor permanently
sensor.delete()

Detection and Response Rules

D&R rules can be managed through the Hive system or the DRRules convenience class.

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization
from limacharlie.sdk.hive import Hive, HiveRecord

client = Client()
org = Organization(client)

# Access D&R rules hive
hive = Hive(org, "dr-general")

# List all rules
rules = hive.list()
for name, record in rules.items():
    print(name, record.enabled)

# Get a specific rule
record = hive.get("my-detection-rule")
print(record.data)  # {'detect': {...}, 'respond': [...]}

# Create or update a rule
new_rule = HiveRecord(
    name="my-new-rule",
    data={
        "detect": {
            "event": "NEW_PROCESS",
            "op": "contains",
            "path": "event/COMMAND_LINE",
            "value": "mimikatz"
        },
        "respond": [
            {"action": "report", "name": "mimikatz-detected"}
        ]
    }
)
hive.set(new_rule)

# Delete a rule
hive.delete("my-old-rule")

Using DRRules Convenience Class

from limacharlie.sdk.dr_rules import DRRules

dr = DRRules(org)

# List rules (general namespace)
rules = dr.list()

# Get a rule
rule = dr.get("my-rule")

# Create a rule
dr.create("my-rule", {
    "detect": {"event": "NEW_PROCESS", "op": "exists", "path": "event/FILE_PATH"},
    "respond": [{"action": "report", "name": "process-detected"}]
})

# Delete a rule
dr.delete("my-rule")

False Positive Rules

from limacharlie.sdk.fp_rules import FPRules

fp = FPRules(org)

# List FP rules
fp_rules = fp.list()

# Create an FP rule
fp.create("my-fp-rule", {
    "op": "is",
    "path": "detect/event/FILE_PATH",
    "value": "C:\\Windows\\System32\\svchost.exe"
})

Replay (Testing Rules Against Historical Data)

from limacharlie.sdk.replay import Replay

replay = Replay(org)

# Run a rule against historical data
result = replay.run(
    rule_name="my-rule",
    start=1700000000,
    end=1700100000,
)

# Test a rule against sample events
events = [{"routing": {...}, "event": {...}}]
result = replay.scan_events(
    events,
    rule_content={"detect": {...}, "respond": [...]},
)

Real-time Data Streaming

Spout (Pull-based Streaming)

The Spout pulls data from stream.limacharlie.io over HTTPS. Works through NATs and proxies. Best for short-term ad-hoc streaming.

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization
from limacharlie.sdk.spout import Spout

client = Client()
org = Organization(client)

# Stream events
spout = Spout(org, data_type="event", tag="production")

try:
    while True:
        data = spout.get(timeout=5)
        if data is not None:
            print(data)
finally:
    spout.shutdown()

Firehose (Push-based Streaming)

The Firehose creates a TLS server that LimaCharlie connects to and pushes data. Best for large-scale, long-running streaming.

from limacharlie.sdk.firehose import Firehose

# Create a firehose listener
fh = Firehose(
    org,
    listen_on="0.0.0.0:4443",
    data_type="event",
    name="my-firehose",
    public_dest="1.2.3.4:4443",
    max_buffer=1024,
    tag="production",
)

try:
    while True:
        data = fh.get(timeout=5)
        if data is not None:
            print(data)
finally:
    fh.shutdown()

Artifacts

from limacharlie.sdk.artifacts import Artifacts

artifacts = Artifacts(org)

# List artifacts
result = artifacts.list(sid='SENSOR_ID', start=start_time, end=end_time)
for artifact in result.get('artifacts', []):
    print(artifact)

# Get download URL for an artifact
url = artifacts.get_url('ARTIFACT_ID')

Hive Operations

The Hive is LimaCharlie's key-value storage system used for D&R rules, secrets, playbooks, SOPs, lookups, and more.

from limacharlie.sdk.hive import Hive, HiveRecord

# Access a hive (e.g., secrets)
hive = Hive(org, "secret")

# List all records
records = hive.list()
for name, record in records.items():
    print(name, record.data)

# Get a specific record
record = hive.get("my-secret")
print(record.data)

# Get metadata only (without data payload)
metadata = hive.get_metadata("my-secret")

# Create or update a record
new_record = HiveRecord(
    name="my-secret",
    data={"secret": "my-secret-value"},
    enabled=True,
    tags=["automation"],
    comment="API key for external service",
)
hive.set(new_record)

# Delete a record
hive.delete("old-record")

Search (LCQL)

from limacharlie.sdk.search import Search
import time

search = Search(org)

end = int(time.time())
start = end - 3600  # 1 hour ago

# Execute a query (returns generator)
for result in search.execute(
    query="event.FILE_PATH ends with .exe",
    start_time=start,
    end_time=end,
    stream="event",
    limit=100,
):
    print(result)

# Validate a query
validation = search.validate(
    query="event.FILE_PATH ends with .exe",
    start_time=start,
    end_time=end,
    stream="event",
)

# Estimate query cost
estimate = search.estimate(
    query="event.FILE_PATH ends with .exe",
    start_time=start,
    end_time=end,
    stream="event",
)

# Saved queries are managed through Hive
from limacharlie.sdk.hive import Hive, HiveRecord
query_hive = Hive(org, "query")
query_hive.set(HiveRecord(
    name="my-query",
    data={"query": "event.FILE_PATH ends with .exe", "stream": "event"},
    enabled=True,
))
saved = query_hive.list()
query_hive.delete("my-query")

Extensions

from limacharlie.sdk.extensions import Extensions

ext = Extensions(org)

# List subscribed extensions
subscribed = ext.list_subscribed()

# List all available extensions
available = ext.get_all()

# Subscribe to an extension
ext.subscribe('ext-reliable-tasking')

# Unsubscribe from an extension
ext.unsubscribe('ext-reliable-tasking')

# Get extension schema
schema = ext.get_schema('ext-reliable-tasking')

# Make a request to an extension
response = ext.request(
    extension_name='ext-reliable-tasking',
    action='task',
    data={
        'task': 'os_version',
        'selector': 'plat == windows',
        'ttl': 3600,
    },
)

Infrastructure as Code

from limacharlie.sdk.configs import Configs

configs = Configs(org)

# Pull organization configuration to a local file
configs.fetch_to_file(file_path='lc_conf.yaml')

# Push configuration from a local file (dry run)
configs.push_from_file(file_path='lc_conf.yaml', is_dry_run=True)

# Push with force (remove resources not in config)
configs.push_from_file(file_path='lc_conf.yaml', is_force=True)

Error Handling

Exception Hierarchy

The SDK uses a structured exception hierarchy with actionable suggestions:

from limacharlie.errors import (
    LimaCharlieError,       # Base exception (exit code 1)
    AuthenticationError,     # Auth failures (exit code 2)
    NotFoundError,           # Resource not found (exit code 3)
    ValidationError,         # Input validation (exit code 4)
    RateLimitError,          # Rate limit hit (exit code 5)
    PermissionDeniedError,   # Permission denied (exit code 2)
    ApiError,                # General API errors (exit code 1)
    ConfigError,             # Configuration errors (exit code 1)
)

Usage

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization
from limacharlie.sdk.sensor import Sensor
from limacharlie.errors import (
    AuthenticationError,
    NotFoundError,
    RateLimitError,
    LimaCharlieError,
)

try:
    client = Client(oid='ORG_ID', api_key='API_KEY')
    org = Organization(client)
    sensor = Sensor(org, 'SENSOR_ID')
    sensor.task('os_info')
except AuthenticationError as e:
    print(f"Auth failed: {e}")
    # Suggestion: Run 'limacharlie auth login' to configure credentials
except NotFoundError as e:
    print(f"Not found: {e}")
except RateLimitError as e:
    print(f"Rate limited, retry after: {e.retry_after}s")
except LimaCharlieError as e:
    print(f"Error: {e}")
    if e.suggestion:
        print(f"Suggestion: {e.suggestion}")

Built-in Retry Logic

The Client automatically retries on HTTP 429 (rate limit) and 504 (gateway timeout) with exponential backoff. No manual retry logic is needed for transient errors.

Complete Examples

Example 1: Automated Sensor Inventory

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization
from limacharlie.sdk.sensor import Sensor

client = Client()
org = Organization(client)

# Build a sensor inventory
inventory = {}
for sensor_info in org.list_sensors():
    sid = sensor_info['sid']
    sensor = Sensor(org, sid)
    info = sensor.get_info()
    tags = sensor.get_tags()

    inventory[sid] = {
        'hostname': info.get('hostname', 'unknown'),
        'platform': info.get('plat', 'unknown'),
        'tags': tags,
        'online': sensor.is_online(),
    }

print(f"Total sensors: {len(inventory)}")
for sid, data in inventory.items():
    print(f"  {data['hostname']} ({sid[:8]}...) - {data['platform']} - tags: {data['tags']}")

Example 2: D&R Rule Deployment

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization
from limacharlie.sdk.hive import Hive, HiveRecord

client = Client()
org = Organization(client)
hive = Hive(org, "dr-general")

# Deploy a detection rule
rule = HiveRecord(
    name="detect-mimikatz",
    data={
        "detect": {
            "op": "and",
            "event": "NEW_PROCESS",
            "rules": [
                {"op": "is windows"},
                {
                    "op": "contains",
                    "path": "event/COMMAND_LINE",
                    "value": "mimikatz",
                    "case sensitive": False,
                }
            ]
        },
        "respond": [
            {"action": "report", "name": "mimikatz-detected"},
            {"action": "task", "command": "history_dump"},
        ]
    },
    enabled=True,
    tags=["threat-hunting"],
    comment="Detect mimikatz execution",
)
hive.set(rule)
print("Rule deployed successfully")

Example 3: Real-time Detection Monitoring

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization
from limacharlie.sdk.sensor import Sensor
from limacharlie.sdk.spout import Spout

client = Client()
org = Organization(client)

# Stream detections in real-time
spout = Spout(org, data_type="detect")

try:
    while True:
        detection = spout.get(timeout=10)
        if detection is not None:
            routing = detection.get('routing', {})
            print(f"Detection: {detection.get('cat', 'unknown')}")
            print(f"  Sensor: {routing.get('hostname', 'unknown')} ({routing.get('sid', 'unknown')})")

            # Auto-isolate on critical detections
            if 'ransomware' in detection.get('cat', '').lower():
                sid = routing.get('sid')
                if sid:
                    sensor = Sensor(org, sid)
                    sensor.isolate()
                    print(f"  [!] Sensor isolated due to ransomware detection")
finally:
    spout.shutdown()

Example 4: Extension Request (Playbook Execution)

from limacharlie.client import Client
from limacharlie.sdk.organization import Organization
from limacharlie.sdk.extensions import Extensions

client = Client()
org = Organization(client)
ext = Extensions(org)

# Trigger a playbook via the ext-playbook extension
response = ext.request("ext-playbook", "run_playbook", {
    "name": "my-playbook",
    "credentials": "hive://secret/my-api-key",
    "data": {
        "target_sensor": "SENSOR_ID",
        "action": "investigate",
    }
})

print(response)