Azure Monitor Ingestion SDK for Python. Use for sending custom logs to Log Analytics workspace via Logs Ingestion API.
Send custom logs to Azure Monitor Log Analytics workspace using the Logs Ingestion API.
pip install azure-monitor-ingestion
pip install azure-identity
# Data Collection Endpoint (DCE)
AZURE_DCE_ENDPOINT=https://<dce-name>.<region>.ingest.monitor.azure.com
# Data Collection Rule (DCR) immutable ID
AZURE_DCR_RULE_ID=dcr-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# Stream name from DCR
AZURE_DCR_STREAM_NAME=Custom-MyTable_CL
Before using this SDK, you need:
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import os
client = LogsIngestionClient(
endpoint=os.environ["AZURE_DCE_ENDPOINT"],
credential=DefaultAzureCredential()
)
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import os
client = LogsIngestionClient(
endpoint=os.environ["AZURE_DCE_ENDPOINT"],
credential=DefaultAzureCredential()
)
rule_id = os.environ["AZURE_DCR_RULE_ID"]
stream_name = os.environ["AZURE_DCR_STREAM_NAME"]
logs = [
{"TimeGenerated": "2024-01-15T10:00:00Z", "Computer": "server1", "Message": "Application started"},
{"TimeGenerated": "2024-01-15T10:01:00Z", "Computer": "server1", "Message": "Processing request"},
{"TimeGenerated": "2024-01-15T10:02:00Z", "Computer": "server2", "Message": "Connection established"}
]
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs)
import json
with open("logs.json", "r") as f:
logs = json.load(f)
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs)
Handle partial failures with a callback:
failed_logs = []
def on_error(error):
print(f"Upload failed: {error.error}")
failed_logs.extend(error.failed_logs)
client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs,
on_error=on_error
)
# Retry failed logs
if failed_logs:
print(f"Retrying {len(failed_logs)} failed logs...")
client.upload(rule_id=rule_id, stream_name=stream_name, logs=failed_logs)
def ignore_errors(error):
pass # Silently ignore upload failures
client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs,
on_error=ignore_errors
)
import asyncio
from azure.monitor.ingestion.aio import LogsIngestionClient
from azure.identity.aio import DefaultAzureCredential
async def upload_logs():
async with LogsIngestionClient(
endpoint=endpoint,
credential=DefaultAzureCredential()
) as client:
await client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs
)
asyncio.run(upload_logs())
from azure.identity import AzureAuthorityHosts, DefaultAzureCredential
from azure.monitor.ingestion import LogsIngestionClient
# Azure Government
credential = DefaultAzureCredential(authority=AzureAuthorityHosts.AZURE_GOVERNMENT)
client = LogsIngestionClient(
endpoint="https://example.ingest.monitor.azure.us",
credential=credential,
credential_scopes=["https://monitor.azure.us/.default"]
)
The SDK automatically:
No manual batching needed for large log sets.
| Client | Purpose |
|---|---|
LogsIngestionClient | Sync client for uploading logs |
LogsIngestionClient (aio) | Async client for uploading logs |
| Concept | Description |
|---|---|
| DCE | Data Collection Endpoint — ingestion URL |
| DCR | Data Collection Rule — defines schema, transformations, destination |
| Stream | Named data flow within a DCR |
| Custom Table | Target table in Log Analytics (ends with _CL) |
Stream names follow patterns:
Custom-<TableName>_CL — For custom tablesMicrosoft-<TableName> — For built-in tableson_error callback for partial failuresThis skill is applicable to execute the workflow or actions described in the overview.