Developer Getting Started Guide
This guide helps developers get started with the FedRAMP High Event-Driven Data Mesh, including setting up the development environment, accessing data products, and creating new data products.
Prerequisites
Section titled “Prerequisites”Before you begin, ensure you have the following:
-
Access Credentials:
- AWS IAM credentials with appropriate permissions
- Databricks workspace access
- Kafka access credentials
-
Required Tools:
- Go 1.18+ (for CLI tool)
- AWS CLI configured with your credentials
- Git client
- Docker and Docker Compose (for local development)
- Terraform (for infrastructure changes)
Installing the CLI Tool
Section titled “Installing the CLI Tool”The Data Mesh CLI (dmesh
) provides a convenient way to interact with the data mesh from your local machine.
From Binary Release
Section titled “From Binary Release”-
Download the latest release for your platform from the releases page
-
Extract the archive and move the binary to a location in your PATH:
tar -xzf dmesh_v1.0.0_linux_amd64.tar.gzchmod +x dmeshsudo mv dmesh /usr/local/bin/
- Verify the installation:
dmesh --version
From Source
Section titled “From Source”- Clone the repository:
git clone https://github.com/frocore/fedramp-data-mesh.gitcd fedramp-data-mesh
- Build the CLI tool:
make build
- Install the binary:
sudo cp cli/bin/dmesh /usr/local/bin/
Configuring the CLI
Section titled “Configuring the CLI”- Run the CLI once to create the default configuration:
dmesh info
- Edit the configuration file at ~/.fedramp-data-mesh/config.yaml:
aws_region: us-east-1aws_profile: fedramp-data-meshaws_account_id: "123456789012"default_role: DataMeshDevelopercatalog_url: "https://catalog.fedramp-data-mesh.example.com"s3_data_lake: "s3://fedramp-data-mesh-lake-123456789012-dev"schema_registry_url: "https://schema-registry.fedramp-data-mesh.example.com"
Discovering Data Products
Section titled “Discovering Data Products”Using the CLI
Section titled “Using the CLI”- List all available data products:
dmesh discover
- Filter by domain:
dmesh discover --domain project_management
- View details for a specific data product:
dmesh info project_management.project_state_events
- View the schema for a data product:
dmesh schema project_management.project_state_events
Using the Databricks Catalog
Section titled “Using the Databricks Catalog”-
Log in to the Databricks workspace
-
Navigate to the Data tab
-
Browse the Catalog for available data products
-
View table details, schema, and sample data
Querying Data Products
Section titled “Querying Data Products”Using the CLI for Queries
Section titled “Using the CLI for Queries”- Run a simple query against a data product:
dmesh query "SELECT * FROM project_management.project_state_latest LIMIT 10"
- Use the interactive query UI:
dmesh query --interactive
- Output results in different formats:
dmesh query "SELECT * FROM project_management.project_state_latest LIMIT 10" --output csvdmesh query "SELECT * FROM project_management.project_state_latest LIMIT 10" --output json
Using Databricks
Section titled “Using Databricks”-
Log in to the Databricks workspace
-
Create a new notebook
-
Use Spark SQL to query data products:
SELECT * FROM project_management.project_state_latest LIMIT 10
- Use Spark DataFrame API:
df = spark.table("project_management.project_state_latest")display(df.limit(10))
Creating a New Data Product
Section titled “Creating a New Data Product”Define the Schema
Section titled “Define the Schema”Create an Avro schema file for your data product. Example:
{ "type": "record", "name": "TaskStateEvent", "namespace": "com.frocore.projectmanagement.events", "doc": "Represents the current state of a task after a change", "fields": [ { "name": "event_id", "type": "string", "doc": "Unique identifier for this event" }, { "name": "event_timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" }, "doc": "Timestamp when this event was created" }, { "name": "event_type", "type": { "type": "enum", "name": "TaskEventType", "symbols": ["CREATED", "UPDATED", "DELETED"] }, "doc": "Type of event that occurred" }, { "name": "task_id", "type": "string", "doc": "Unique identifier for the task" }, { "name": "project_id", "type": "string", "doc": "ID of the project this task belongs to" }, { "name": "title", "type": "string", "doc": "Task title" }, { "name": "description", "type": ["string", "null"], "doc": "Task description" }, { "name": "status", "type": { "type": "enum", "name": "TaskStatus", "symbols": ["TODO", "IN_PROGRESS", "DONE", "BLOCKED"] }, "doc": "Current status of the task" }, { "name": "assignee_id", "type": ["string", "null"], "doc": "ID of the person assigned to the task" }, { "name": "created_at", "type": { "type": "long", "logicalType": "timestamp-millis" }, "doc": "Timestamp when the task was initially created" }, { "name": "modified_at", "type": { "type": "long", "logicalType": "timestamp-millis" }, "doc": "Timestamp when the task was last modified" } ]}
Register the Schema
Section titled “Register the Schema”Register the schema with the Schema Registry:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data @task_state_event.avsc \ https://schema-registry.fedramp-data-mesh.example.com/subjects/projects.task_state_events/versions
Create a Data Product Definition
Section titled “Create a Data Product Definition”Create a YAML definition for your data product:
kind: DataProductapiVersion: datamesh.frocore.io/v1metadata: name: task_state_events domain: project_management owner: project-management-team@frocore.io description: State events for task entities documentation: | This data product captures the state of each task after changes. It is related to the project_state_events data product.spec: schemaRef: type: avro path: /domains/project-management/schemas/task_state_event.avsc eventStream: topicName: projects.task_state_events partitionKey: task_id retention: time: 30d replication: 3 tables: - name: task_state_history catalog: project_management format: iceberg location: s3://fedramp-data-mesh-lake/project_management/task_state_history partitioning: - name: event_date transform: "day(event_timestamp)" - name: task_state_latest catalog: project_management format: iceberg location: s3://fedramp-data-mesh-lake/project_management/task_state_latest retention: snapshots: 5 sla: latency: 1m availability: 99.9% securityClassification: CONTROLLED_UNCLASSIFIED lineage: upstream: - source: projects-db.public.tasks type: database-table access: roles: - name: project_admin permissions: [read, write] - name: project_analyst permissions: [read] - name: data_engineer permissions: [read]
Configure the Source Connector
Section titled “Configure the Source Connector”Create a configuration for your CDC connector:
{ "name": "tasks-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "${DB_HOST}", "database.port": "${DB_PORT}", "database.user": "${DB_USER}", "database.password": "${DB_PASSWORD}", "database.dbname": "frocore", "database.server.name": "frocore-projects", "table.include.list": "public.tasks", "schema.include.list": "public", "database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}", "database.history.kafka.topic": "schema-changes.frocore.tasks", "transforms": "unwrap,AddSourceMetadata", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.AddSourceMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.AddSourceMetadata.static.field": "source_system", "transforms.AddSourceMetadata.static.value": "projects-db", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "${SCHEMA_REGISTRY_URL}", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "${SCHEMA_REGISTRY_URL}" }}
Implement the Data Processor
Section titled “Implement the Data Processor”Create a Spark job to process the events:
package com.frocore.datamesh.project_management.processors
import org.apache.spark.sql.{DataFrame, SparkSession}import org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.Triggerimport org.apache.spark.sql.avro.functions._import java.util.UUID
object TaskStateProcessor { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Task State Processor") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.ice", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.ice.type", "hadoop") .config("spark.sql.catalog.ice.warehouse", "s3a://fedramp-data-mesh-warehouse/") .getOrCreate()
import spark.implicits._
// Read from Kafka topic val kafkaStreamDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "${KAFKA_BOOTSTRAP_SERVERS}") .option("subscribe", "projects.task_state_events") .option("startingOffsets", "latest") .option("kafka.security.protocol", "SSL") .load()
// Parse the Avro payload val parsedDF = kafkaStreamDF .select( col("key").cast("string").as("kafka_key"), col("timestamp").as("kafka_timestamp"), from_avro(col("value"), "task_state_event_schema").as("event") ) .select( col("kafka_key"), col("kafka_timestamp"), col("event.*") )
// Add processing metadata val enrichedDF = parsedDF .withColumn("processing_time", current_timestamp()) .withColumn("processing_id", lit(UUID.randomUUID().toString))
// Write to Iceberg table (history) val historyQuery = enrichedDF .writeStream .format("iceberg") .outputMode("append") .option("path", "ice.project_management.task_state_history") .option("checkpointLocation", "s3a://fedramp-data-mesh-checkpoints/task_state_processor/history/") .trigger(Trigger.ProcessingTime("1 minute")) .start()
// Write to Iceberg table (latest state) val latestQuery = enrichedDF .writeStream .foreachBatch { (batchDF: DataFrame, batchId: Long) => // Upsert (merge) the latest state batchDF.createOrReplaceTempView("updates")
spark.sql(""" MERGE INTO ice.project_management.task_state_latest target USING ( SELECT task_id, project_id, title, description, status, assignee_id, created_at, modified_at, event_timestamp FROM updates ) source ON target.task_id = source.task_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """) } .option("checkpointLocation", "s3a://fedramp-data-mesh-checkpoints/task_state_processor/latest/") .trigger(Trigger.ProcessingTime("1 minute")) .start()
spark.streams.awaitAnyTermination() }}
Deploy the Data Product
Section titled “Deploy the Data Product”- Submit your Kafka connector configuration:
curl -X POST -H "Content-Type: application/json" \ --data @tasks-source-connector.json \ http://kafka-connect.fedramp-data-mesh.example.com:8083/connectors
- Deploy the Spark job to Databricks:
databricks jobs create --json @job-config.json
- Register the data product tables in Unity Catalog:
CREATE TABLE project_management.task_state_historyUSING icebergLOCATION 's3://fedramp-data-mesh-lake/project_management/task_state_history';
CREATE TABLE project_management.task_state_latestUSING icebergLOCATION 's3://fedramp-data-mesh-lake/project_management/task_state_latest';
Next Steps
Section titled “Next Steps”- Learn more about Data Product Design Patterns
- Explore Advanced Querying Techniques
- Understand Event Schema Evolution
- Set up Data Quality Monitoring