Direct API Integration for OpenLineage Events¶
Alation Cloud Service Applies to Alation Cloud Service instances of Alation
Overview¶
If your system doesn’t have a pre-built OpenLineage integration, you can directly send OpenLineage events to Alation’s ingestion endpoint using standard HTTP POST requests.
Use the Direct API integration when:
Your orchestration tool doesn’t have an OpenLineage provider
You are building a custom data pipeline or ETL system
You want to send lineage from proprietary applications
You want full control over event emission
For information on general prerequisites, see OpenLineage Integration.
Endpoint Details¶
Base URL Format
https://<your-tenant>.alationcloud.com/open_lineage_event/
HTTP Method
POST
Content Type
application/json
Authentication
Bearer token authentication using an Alation API access token. See Authentication below.
Authentication¶
Create API Access Token¶
Sign in to Alation as a Server Admin.
Navigate to Settings > Server Admin > API Access Tokens.
Click Create Token.
Provide a name and expiration date.
Copy and securely store the token on your local machine.
For detailed instructions, see Create a Refresh Token via the UI.
Request Headers¶
Content-Type: application/json
Authorization: Bearer <your-api-token>
OpenLineage Event Structure¶
Events must follow the OpenLineage specification. At minimum, each event must include:
eventType: Type of event (must beCOMPLETEfor lineage creation)eventTime: ISO 8601 timestamp of the eventrun: Information about the job run (must includerunId)job: Information about the job (must includenamespaceandname)inputs: Array of input datasets (sources) - at least one requiredoutputs: Array of output datasets (targets) - at least one required
Important
Alation only creates lineage from COMPLETE events. Events with type START, RUNNING, FAIL, or ABORT are not processed for lineage creation.
Minimal Event Example¶
{
"eventType": "COMPLETE",
"eventTime": "2024-02-20T10:30:00.000Z",
"run": {
"runId": "d3c5c3e0-8c8a-4b0a-9c1a-3e5e5e5e5e5e"
},
"job": {
"namespace": "my-etl-system",
"name": "daily-sales-aggregation"
},
"inputs": [
{
"namespace": "postgres://prod-db.example.com:5432",
"name": "sales_db.public.transactions"
}
],
"outputs": [
{
"namespace": "snowflake://account.region.snowflakecomputing.com",
"name": "analytics.public.daily_sales"
}
]
}
Complete Event Example with Metadata¶
{
"eventType": "COMPLETE",
"eventTime": "2024-02-20T10:30:00.000Z",
"run": {
"runId": "d3c5c3e0-8c8a-4b0a-9c1a-3e5e5e5e5e5e",
"facets": {
"nominalTime": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/NominalTimeRunFacet.json",
"nominalStartTime": "2024-02-20T00:00:00.000Z",
"nominalEndTime": "2024-02-20T23:59:59.000Z"
}
}
},
"job": {
"namespace": "my-etl-system",
"name": "daily-sales-aggregation",
"facets": {
"documentation": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json",
"description": "Aggregates daily sales transactions into summary tables"
},
"sql": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SqlJobFacet.json",
"query": "INSERT INTO analytics.public.daily_sales SELECT date, SUM(amount), COUNT(*) FROM sales_db.public.transactions GROUP BY date"
}
}
},
"inputs": [
{
"namespace": "postgres://prod-db.example.com:5432",
"name": "sales_db.public.transactions",
"facets": {
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json",
"fields": [
{"name": "transaction_id", "type": "INTEGER"},
{"name": "amount", "type": "DECIMAL"},
{"name": "transaction_date", "type": "TIMESTAMP"}
]
}
}
}
],
"outputs": [
{
"namespace": "snowflake://account.region.snowflakecomputing.com",
"name": "analytics.public.daily_sales",
"facets": {
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json",
"fields": [
{"name": "sale_date", "type": "DATE"},
{"name": "total_amount", "type": "DECIMAL"},
{"name": "transaction_count", "type": "INTEGER"}
]
}
}
}
],
"producer": "https://github.com/myorg/custom-etl/v1.0.0"
}
Implementation Examples¶
cURL Example¶
curl -X POST 'https://your-tenant.alationcloud.com/open_lineage_event/' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer <your-api-token>' \
-d '{
"eventType": "COMPLETE",
"eventTime": "2024-02-20T10:30:00.000Z",
"run": {
"runId": "d3c5c3e0-8c8a-4b0a-9c1a-3e5e5e5e5e5e"
},
"job": {
"namespace": "my-etl-system",
"name": "daily-sales-aggregation"
},
"inputs": [
{
"namespace": "postgres://prod-db.example.com:5432",
"name": "sales_db.public.transactions"
}
],
"outputs": [
{
"namespace": "snowflake://account.region.snowflakecomputing.com",
"name": "analytics.public.daily_sales"
}
]
}'
Python Example¶
Using the requests library:
import requests
import json
from datetime import datetime
import uuid
# Configuration
ALATION_BASE_URL = "https://your-tenant.alationcloud.com"
API_TOKEN = "your-api-token-here"
# OpenLineage event
event = {
"eventType": "COMPLETE",
"eventTime": datetime.utcnow().isoformat() + "Z",
"run": {
"runId": str(uuid.uuid4())
},
"job": {
"namespace": "my-etl-system",
"name": "daily-sales-aggregation"
},
"inputs": [
{
"namespace": "postgres://prod-db.example.com:5432",
"name": "sales_db.public.transactions"
}
],
"outputs": [
{
"namespace": "snowflake://account.region.snowflakecomputing.com",
"name": "analytics.public.daily_sales"
}
]
}
# Send event
response = requests.post(
f"{ALATION_BASE_URL}/open_lineage_event/",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {API_TOKEN}"
},
json=event
)
# Check response
if response.status_code == 200:
print("Event sent successfully")
else:
print(f"Error: {response.status_code} - {response.text}")
Using OpenLineage Python Client¶
For production use, consider using the official OpenLineage Python client:
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job
from openlineage.client.facet import SqlJobFacet
from datetime import datetime
import uuid
# Initialize client
client = OpenLineageClient(
url="https://your-tenant.alationcloud.com/open_lineage_event/",
api_key="your-api-token-here"
)
# Create event
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.utcnow().isoformat() + "Z",
run=Run(runId=str(uuid.uuid4())),
job=Job(
namespace="my-etl-system",
name="daily-sales-aggregation",
facets={
"sql": SqlJobFacet(
query="INSERT INTO analytics.public.daily_sales SELECT date, SUM(amount), COUNT(*) FROM sales_db.public.transactions GROUP BY date"
)
}
),
inputs=[
{
"namespace": "postgres://prod-db.example.com:5432",
"name": "sales_db.public.transactions"
}
],
outputs=[
{
"namespace": "snowflake://account.region.snowflakecomputing.com",
"name": "analytics.public.daily_sales"
}
]
)
# Emit event
client.emit(event)
Java Example¶
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Instant;
import java.util.UUID;
public class OpenLineageClient {
private static final String ALATION_URL = "https://your-tenant.alationcloud.com/open_lineage_event/";
private static final String API_TOKEN = "your-api-token-here";
public static void main(String[] args) throws Exception {
String eventJson = """
{
"eventType": "COMPLETE",
"eventTime": "%s",
"run": {
"runId": "%s"
},
"job": {
"namespace": "my-etl-system",
"name": "daily-sales-aggregation"
},
"inputs": [
{
"namespace": "postgres://prod-db.example.com:5432",
"name": "sales_db.public.transactions"
}
],
"outputs": [
{
"namespace": "snowflake://account.region.snowflakecomputing.com",
"name": "analytics.public.daily_sales"
}
]
}
""".formatted(Instant.now().toString(), UUID.randomUUID().toString());
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(ALATION_URL))
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + API_TOKEN)
.POST(HttpRequest.BodyPublishers.ofString(eventJson))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println("Status: " + response.statusCode());
System.out.println("Response: " + response.body());
}
}
Response Codes¶
Success responses:
200 OK: Event received and processed successfully
Client error responses:
400 Bad Request: Invalid event format or missing required fields401 Unauthorized: Missing or invalid API token403 Forbidden: API token doesn’t have required permissions
Server error responses:
500 Internal Server Error: Server-side processing error503 Service Unavailable: Service temporarily unavailable
Namespace Format Guidelines¶
Namespace format should match your data source configuration in Alation. The namespace identifies where the data resides.
Common Namespace Patterns¶
Data Source Type |
Namespace Format |
|---|---|
PostgreSQL |
|
Snowflake |
|
MySQL |
|
Redshift |
|
BigQuery |
|
SQL Server |
|
Oracle |
|
S3 |
|
Dataset Name Format¶
Format: database.schema.table (adjust based on your data source structure)
Examples:
PostgreSQL:
sales_db.public.transactionsSnowflake:
analytics.public.daily_salesBigQuery:
project.dataset.table
Testing Your Integration¶
Test Event¶
Start with a simple test event to verify connectivity and authentication:
curl -X POST 'https://your-tenant.alationcloud.com/open_lineage_event/' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer <your-api-token>' \
-d '{
"eventType": "COMPLETE",
"eventTime": "2024-02-20T10:00:00.000Z",
"run": {"runId": "test-run-001"},
"job": {
"namespace": "test-system",
"name": "test-job"
},
"inputs": [
{
"namespace": "postgres://your-cataloged-db:5432",
"name": "test_db.public.source_table"
}
],
"outputs": [
{
"namespace": "postgres://your-cataloged-db:5432",
"name": "test_db.public.target_table"
}
]
}'
Verification Steps¶
Check API response - Verify you receive a
200 OKstatus.View in Alation - Navigate to the output table in Alation catalog.
Check lineage tab - Verify lineage appears on the Lineage tab.
Validate metadata - Confirm job name, namespace, and timestamps are correct.
For troubleshooting issues with OpenLineage integration, see OpenLineage Integration Troubleshooting.
Best Practices¶
Use unique run IDs - Generate a new UUID for each job run.
Include metadata - Add facets for better context (documentation, SQL).
Send
COMPLETEevents - OnlyCOMPLETEevents create lineage.Match namespaces - Ensure namespaces match your Alation data source configuration.
Handle errors - Implement retry logic for transient failures.
Secure tokens - Store API tokens in secrets management systems.
Log events - Keep logs of sent events for debugging.
Test thoroughly - Validate in non-production environment first.
See the following resources for more information:
OpenLineage Specification: https://openlineage.io/docs/spec/
OpenLineage Python Client: https://pypi.org/project/openlineage-python/
General Prerequisites: OpenLineage Integration
Troubleshooting: OpenLineage Integration Troubleshooting