Direct API Integration for OpenLineage Events

Alation Cloud Service Applies to Alation Cloud Service instances of Alation

Overview

If your system doesn’t have a pre-built OpenLineage integration, you can directly send OpenLineage events to Alation’s ingestion endpoint using standard HTTP POST requests.

Use the Direct API integration when:

  • Your orchestration tool doesn’t have an OpenLineage provider

  • You are building a custom data pipeline or ETL system

  • You want to send lineage from proprietary applications

  • You want full control over event emission

For information on general prerequisites, see OpenLineage Integration.

Endpoint Details

Base URL Format

https://<your-tenant>.alationcloud.com/open_lineage_event/

HTTP Method

POST

Content Type

application/json

Authentication

Bearer token authentication using an Alation API access token. See Authentication below.

Authentication

Create API Access Token

  1. Sign in to Alation as a Server Admin.

  2. Navigate to Settings > Server Admin > API Access Tokens.

  3. Click Create Token.

  4. Provide a name and expiration date.

  5. Copy and securely store the token on your local machine.

For detailed instructions, see Create a Refresh Token via the UI.

Request Headers

Content-Type: application/json
Authorization: Bearer <your-api-token>

OpenLineage Event Structure

Events must follow the OpenLineage specification. At minimum, each event must include:

  • eventType: Type of event (must be COMPLETE for lineage creation)

  • eventTime: ISO 8601 timestamp of the event

  • run: Information about the job run (must include runId)

  • job: Information about the job (must include namespace and name)

  • inputs: Array of input datasets (sources) - at least one required

  • outputs: Array of output datasets (targets) - at least one required

Important

Alation only creates lineage from COMPLETE events. Events with type START, RUNNING, FAIL, or ABORT are not processed for lineage creation.

Minimal Event Example

{
  "eventType": "COMPLETE",
  "eventTime": "2024-02-20T10:30:00.000Z",
  "run": {
    "runId": "d3c5c3e0-8c8a-4b0a-9c1a-3e5e5e5e5e5e"
  },
  "job": {
    "namespace": "my-etl-system",
    "name": "daily-sales-aggregation"
  },
  "inputs": [
    {
      "namespace": "postgres://prod-db.example.com:5432",
      "name": "sales_db.public.transactions"
    }
  ],
  "outputs": [
    {
      "namespace": "snowflake://account.region.snowflakecomputing.com",
      "name": "analytics.public.daily_sales"
    }
  ]
}

Complete Event Example with Metadata

{
  "eventType": "COMPLETE",
  "eventTime": "2024-02-20T10:30:00.000Z",
  "run": {
    "runId": "d3c5c3e0-8c8a-4b0a-9c1a-3e5e5e5e5e5e",
    "facets": {
      "nominalTime": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
        "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/NominalTimeRunFacet.json",
        "nominalStartTime": "2024-02-20T00:00:00.000Z",
        "nominalEndTime": "2024-02-20T23:59:59.000Z"
      }
    }
  },
  "job": {
    "namespace": "my-etl-system",
    "name": "daily-sales-aggregation",
    "facets": {
      "documentation": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
        "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json",
        "description": "Aggregates daily sales transactions into summary tables"
      },
      "sql": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
        "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SqlJobFacet.json",
        "query": "INSERT INTO analytics.public.daily_sales SELECT date, SUM(amount), COUNT(*) FROM sales_db.public.transactions GROUP BY date"
      }
    }
  },
  "inputs": [
    {
      "namespace": "postgres://prod-db.example.com:5432",
      "name": "sales_db.public.transactions",
      "facets": {
        "schema": {
          "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
          "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json",
          "fields": [
            {"name": "transaction_id", "type": "INTEGER"},
            {"name": "amount", "type": "DECIMAL"},
            {"name": "transaction_date", "type": "TIMESTAMP"}
          ]
        }
      }
    }
  ],
  "outputs": [
    {
      "namespace": "snowflake://account.region.snowflakecomputing.com",
      "name": "analytics.public.daily_sales",
      "facets": {
        "schema": {
          "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/client/python",
          "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json",
          "fields": [
            {"name": "sale_date", "type": "DATE"},
            {"name": "total_amount", "type": "DECIMAL"},
            {"name": "transaction_count", "type": "INTEGER"}
          ]
        }
      }
    }
  ],
  "producer": "https://github.com/myorg/custom-etl/v1.0.0"
}

Implementation Examples

cURL Example

curl -X POST 'https://your-tenant.alationcloud.com/open_lineage_event/' \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer <your-api-token>' \
  -d '{
    "eventType": "COMPLETE",
    "eventTime": "2024-02-20T10:30:00.000Z",
    "run": {
      "runId": "d3c5c3e0-8c8a-4b0a-9c1a-3e5e5e5e5e5e"
    },
    "job": {
      "namespace": "my-etl-system",
      "name": "daily-sales-aggregation"
    },
    "inputs": [
      {
        "namespace": "postgres://prod-db.example.com:5432",
        "name": "sales_db.public.transactions"
      }
    ],
    "outputs": [
      {
        "namespace": "snowflake://account.region.snowflakecomputing.com",
        "name": "analytics.public.daily_sales"
      }
    ]
  }'

Python Example

Using the requests library:

import requests
import json
from datetime import datetime
import uuid

# Configuration
ALATION_BASE_URL = "https://your-tenant.alationcloud.com"
API_TOKEN = "your-api-token-here"

# OpenLineage event
event = {
    "eventType": "COMPLETE",
    "eventTime": datetime.utcnow().isoformat() + "Z",
    "run": {
        "runId": str(uuid.uuid4())
    },
    "job": {
        "namespace": "my-etl-system",
        "name": "daily-sales-aggregation"
    },
    "inputs": [
        {
            "namespace": "postgres://prod-db.example.com:5432",
            "name": "sales_db.public.transactions"
        }
    ],
    "outputs": [
        {
            "namespace": "snowflake://account.region.snowflakecomputing.com",
            "name": "analytics.public.daily_sales"
        }
    ]
}

# Send event
response = requests.post(
    f"{ALATION_BASE_URL}/open_lineage_event/",
    headers={
        "Content-Type": "application/json",
        "Authorization": f"Bearer {API_TOKEN}"
    },
    json=event
)

# Check response
if response.status_code == 200:
    print("Event sent successfully")
else:
    print(f"Error: {response.status_code} - {response.text}")

Using OpenLineage Python Client

For production use, consider using the official OpenLineage Python client:

from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job
from openlineage.client.facet import SqlJobFacet
from datetime import datetime
import uuid

# Initialize client
client = OpenLineageClient(
    url="https://your-tenant.alationcloud.com/open_lineage_event/",
    api_key="your-api-token-here"
)

# Create event
event = RunEvent(
    eventType=RunState.COMPLETE,
    eventTime=datetime.utcnow().isoformat() + "Z",
    run=Run(runId=str(uuid.uuid4())),
    job=Job(
        namespace="my-etl-system",
        name="daily-sales-aggregation",
        facets={
            "sql": SqlJobFacet(
                query="INSERT INTO analytics.public.daily_sales SELECT date, SUM(amount), COUNT(*) FROM sales_db.public.transactions GROUP BY date"
            )
        }
    ),
    inputs=[
        {
            "namespace": "postgres://prod-db.example.com:5432",
            "name": "sales_db.public.transactions"
        }
    ],
    outputs=[
        {
            "namespace": "snowflake://account.region.snowflakecomputing.com",
            "name": "analytics.public.daily_sales"
        }
    ]
)

# Emit event
client.emit(event)

Java Example

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Instant;
import java.util.UUID;

public class OpenLineageClient {
    private static final String ALATION_URL = "https://your-tenant.alationcloud.com/open_lineage_event/";
    private static final String API_TOKEN = "your-api-token-here";

    public static void main(String[] args) throws Exception {
        String eventJson = """
            {
                "eventType": "COMPLETE",
                "eventTime": "%s",
                "run": {
                    "runId": "%s"
                },
                "job": {
                    "namespace": "my-etl-system",
                    "name": "daily-sales-aggregation"
                },
                "inputs": [
                    {
                        "namespace": "postgres://prod-db.example.com:5432",
                        "name": "sales_db.public.transactions"
                    }
                ],
                "outputs": [
                    {
                        "namespace": "snowflake://account.region.snowflakecomputing.com",
                        "name": "analytics.public.daily_sales"
                    }
                ]
            }
            """.formatted(Instant.now().toString(), UUID.randomUUID().toString());

        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(ALATION_URL))
            .header("Content-Type", "application/json")
            .header("Authorization", "Bearer " + API_TOKEN)
            .POST(HttpRequest.BodyPublishers.ofString(eventJson))
            .build();

        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        System.out.println("Status: " + response.statusCode());
        System.out.println("Response: " + response.body());
    }
}

Response Codes

  • Success responses:

    • 200 OK: Event received and processed successfully

  • Client error responses:

    • 400 Bad Request: Invalid event format or missing required fields

    • 401 Unauthorized: Missing or invalid API token

    • 403 Forbidden: API token doesn’t have required permissions

  • Server error responses:

    • 500 Internal Server Error: Server-side processing error

    • 503 Service Unavailable: Service temporarily unavailable

Namespace Format Guidelines

Namespace format should match your data source configuration in Alation. The namespace identifies where the data resides.

Common Namespace Patterns

Data Source Type

Namespace Format

PostgreSQL

postgres://hostname:port or postgresql://hostname:port

Snowflake

snowflake://account.region.snowflakecomputing.com

MySQL

mysql://hostname:port

Redshift

redshift://cluster.region.redshift.amazonaws.com:5439

BigQuery

bigquery://project-id

SQL Server

sqlserver://hostname:port

Oracle

oracle://hostname:port

S3

s3://bucket-name

Dataset Name Format

Format: database.schema.table (adjust based on your data source structure)

Examples:

  • PostgreSQL: sales_db.public.transactions

  • Snowflake: analytics.public.daily_sales

  • BigQuery: project.dataset.table

Testing Your Integration

Test Event

Start with a simple test event to verify connectivity and authentication:

curl -X POST 'https://your-tenant.alationcloud.com/open_lineage_event/' \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer <your-api-token>' \
  -d '{
    "eventType": "COMPLETE",
    "eventTime": "2024-02-20T10:00:00.000Z",
    "run": {"runId": "test-run-001"},
    "job": {
      "namespace": "test-system",
      "name": "test-job"
    },
    "inputs": [
      {
        "namespace": "postgres://your-cataloged-db:5432",
        "name": "test_db.public.source_table"
      }
    ],
    "outputs": [
      {
        "namespace": "postgres://your-cataloged-db:5432",
        "name": "test_db.public.target_table"
      }
    ]
  }'

Verification Steps

  1. Check API response - Verify you receive a 200 OK status.

  2. View in Alation - Navigate to the output table in Alation catalog.

  3. Check lineage tab - Verify lineage appears on the Lineage tab.

  4. Validate metadata - Confirm job name, namespace, and timestamps are correct.

For troubleshooting issues with OpenLineage integration, see OpenLineage Integration Troubleshooting.

Best Practices

  • Use unique run IDs - Generate a new UUID for each job run.

  • Include metadata - Add facets for better context (documentation, SQL).

  • Send COMPLETE events - Only COMPLETE events create lineage.

  • Match namespaces - Ensure namespaces match your Alation data source configuration.

  • Handle errors - Implement retry logic for transient failures.

  • Secure tokens - Store API tokens in secrets management systems.

  • Log events - Keep logs of sent events for debugging.

  • Test thoroughly - Validate in non-production environment first.

See the following resources for more information: