S3 Data Lake
Important Capabilities
Capability | Status | Notes |
---|---|---|
Data Profiling | ✅ | Optionally enabled via configuration |
Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata |
Extract Tags | ✅ | Can extract S3 object/bucket tags if enabled |
This plugin extracts:
- Row and column counts for each table
- For each column, if profiling is enabled:
- null counts and proportions
- distinct counts and proportions
- minimum, maximum, mean, median, standard deviation, some quantile values
- histograms or frequencies of unique values
This connector supports both local files as well as those stored on AWS S3 (which must be identified using the prefix s3://
). Supported file types are as follows:
- CSV
- TSV
- JSON
- Parquet
- Apache Avro
Schemas for Parquet and Avro files are extracted as provided.
Schemas for schemaless formats (CSV, TSV, JSON) are inferred. For CSV and TSV files, we consider the first 100 rows by default, which can be controlled via the max_rows
recipe parameter (see below)
JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance.
We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.
To ingest datasets from your data lake, you need to provide the dataset path format specifications using path_specs
configuration in ingestion recipe.
Refer section Path Specs for examples.
Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see compatibility for more details). If profiling, make sure that permissions for s3a:// access are set because Spark and Hadoop use the s3a:// protocol to interface with AWS (schema inference outside of profiling requires s3:// access). Enabling profiling will slow down ingestion runs.
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[s3]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: s3
config:
path_specs:
-
include: "s3://covid19-lake/covid_knowledge_graph/csv/nodes/*.*"
aws_config:
aws_access_key_id: *****
aws_secret_access_key: *****
aws_region: us-east-2
env: "PROD"
profiling:
enabled: false
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
max_rows integer | Maximum number of rows to use when inferring schemas for TSV and CSV files. Default: 100 |
number_of_files_to_sample integer | Number of files to list to sample for schema inference. This will be ignored if sample_files is set to False in the pathspec. Default: 100 |
platform string | The platform that this source connects to (either 's3' or 'file'). If not specified, the platform will be inferred from the path_specs. Default: |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to |
spark_driver_memory string | Max amount of memory to grant Spark. Default: 4g |
use_s3_bucket_tags boolean | Whether or not to create tags in datahub from the s3 bucket |
use_s3_object_tags boolean | Whether or not to create tags in datahub from the s3 object |
verify_ssl One of boolean, string | Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Default: True |
env string | The environment that all assets produced by this connector belong to Default: PROD |
aws_config AwsConnectionConfig | AWS configuration |
aws_config.aws_region ❓ string | AWS region code. |
aws_config.aws_access_key_id string | AWS access key ID. Can be auto-detected, see the AWS boto3 docs for details. |
aws_config.aws_endpoint_url string | The AWS service endpoint. This is normally constructed automatically, but can be overridden here. |
aws_config.aws_profile string | Named AWS profile to use. Only used if access key / secret are unset. If not set the default will be used |
aws_config.aws_proxy map(str,string) | |
aws_config.aws_secret_access_key string | AWS secret access key. Can be auto-detected, see the AWS boto3 docs for details. |
aws_config.aws_session_token string | AWS session token. Can be auto-detected, see the AWS boto3 docs for details. |
aws_config.aws_role One of string, union(anyOf), string, AwsAssumeRoleConfig | AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are documented at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role |
aws_config.aws_role.RoleArn ❓ string | ARN of the role to assume. |
aws_config.aws_role.ExternalId string | External ID to use when assuming the role. |
path_specs array(object) | |
path_specs.include ❓ string | Path to table. Name variable {table} is used to mark the folder with dataset. In absence of {table} , file level dataset will be created. Check below examples for more details. |
path_specs.default_extension string | For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped. |
path_specs.enable_compression boolean | Enable or disable processing compressed files. Currently .gz and .bz files are supported. Default: True |
path_specs.exclude array(string) | |
path_specs.file_types array(string) | |
path_specs.sample_files boolean | Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled Default: True |
path_specs.table_name string | Display name of the dataset.Combination of named variables from include path and strings |
profile_patterns AllowDenyPattern | regex patterns for tables to profile Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
profile_patterns.allow array(string) | |
profile_patterns.deny array(string) | |
profile_patterns.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
profiling DataLakeProfilerConfig | Data profiling configuration Default: {'enabled': False, 'profile_table_level_only': Fal... |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.include_field_distinct_value_frequencies boolean | Whether to profile for distinct value frequencies. Default: True |
profiling.include_field_histogram boolean | Whether to profile for the histogram for numeric fields. Default: True |
profiling.include_field_max_value boolean | Whether to profile for the max value of numeric columns. Default: True |
profiling.include_field_mean_value boolean | Whether to profile for the mean value of numeric columns. Default: True |
profiling.include_field_median_value boolean | Whether to profile for the median value of numeric columns. Default: True |
profiling.include_field_min_value boolean | Whether to profile for the min value of numeric columns. Default: True |
profiling.include_field_null_count boolean | Whether to profile for the number of nulls for each column. Default: True |
profiling.include_field_quantiles boolean | Whether to profile for the quantiles of numeric columns. Default: True |
profiling.include_field_sample_values boolean | Whether to profile for the sample values for all columns. Default: True |
profiling.include_field_stddev_value boolean | Whether to profile for the standard deviation of numeric columns. Default: True |
profiling.max_number_of_fields_to_profile integer | A positive integer that specifies the maximum number of columns to profile for any table. None implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up. |
profiling.profile_table_level_only boolean | Whether to perform profiling at table-level only or include column-level profiling as well. Default: False |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. |
stateful_ingestion.enabled boolean | The type of the ingestion state provider registered with datahub. Default: False |
stateful_ingestion.ignore_new_state boolean | If set to True, ignores the current checkpoint state. Default: False |
stateful_ingestion.ignore_old_state boolean | If set to True, ignores the previous checkpoint state. Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "DataLakeSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"path_specs": {
"title": "Path Specs",
"description": "List of PathSpec. See [below](#path-spec) the details about PathSpec",
"type": "array",
"items": {
"$ref": "#/definitions/PathSpec"
}
},
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"stateful_ingestion": {
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
},
"platform": {
"title": "Platform",
"description": "The platform that this source connects to (either 's3' or 'file'). If not specified, the platform will be inferred from the path_specs.",
"default": "",
"type": "string"
},
"aws_config": {
"title": "Aws Config",
"description": "AWS configuration",
"allOf": [
{
"$ref": "#/definitions/AwsConnectionConfig"
}
]
},
"use_s3_bucket_tags": {
"title": "Use S3 Bucket Tags",
"description": "Whether or not to create tags in datahub from the s3 bucket",
"type": "boolean"
},
"use_s3_object_tags": {
"title": "Use S3 Object Tags",
"description": "Whether or not to create tags in datahub from the s3 object",
"type": "boolean"
},
"profile_patterns": {
"title": "Profile Patterns",
"description": "regex patterns for tables to profile ",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"profiling": {
"title": "Profiling",
"description": "Data profiling configuration",
"default": {
"enabled": false,
"profile_table_level_only": false,
"max_number_of_fields_to_profile": null,
"include_field_null_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"include_field_mean_value": true,
"include_field_median_value": true,
"include_field_stddev_value": true,
"include_field_quantiles": true,
"include_field_distinct_value_frequencies": true,
"include_field_histogram": true,
"include_field_sample_values": true
},
"allOf": [
{
"$ref": "#/definitions/DataLakeProfilerConfig"
}
]
},
"spark_driver_memory": {
"title": "Spark Driver Memory",
"description": "Max amount of memory to grant Spark.",
"default": "4g",
"type": "string"
},
"max_rows": {
"title": "Max Rows",
"description": "Maximum number of rows to use when inferring schemas for TSV and CSV files.",
"default": 100,
"type": "integer"
},
"verify_ssl": {
"title": "Verify Ssl",
"description": "Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.",
"default": true,
"anyOf": [
{
"type": "boolean"
},
{
"type": "string"
}
]
},
"number_of_files_to_sample": {
"title": "Number Of Files To Sample",
"description": "Number of files to list to sample for schema inference. This will be ignored if sample_files is set to False in the pathspec.",
"default": 100,
"type": "integer"
}
},
"required": [
"path_specs"
],
"additionalProperties": false,
"definitions": {
"PathSpec": {
"title": "PathSpec",
"type": "object",
"properties": {
"include": {
"title": "Include",
"description": "Path to table. Name variable `{table}` is used to mark the folder with dataset. In absence of `{table}`, file level dataset will be created. Check below examples for more details.",
"type": "string"
},
"exclude": {
"title": "Exclude",
"description": "list of paths in glob pattern which will be excluded while scanning for the datasets",
"type": "array",
"items": {
"type": "string"
}
},
"file_types": {
"title": "File Types",
"description": "Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted.",
"default": [
"csv",
"tsv",
"json",
"parquet",
"avro"
],
"type": "array",
"items": {
"type": "string"
}
},
"default_extension": {
"title": "Default Extension",
"description": "For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.",
"type": "string"
},
"table_name": {
"title": "Table Name",
"description": "Display name of the dataset.Combination of named variables from include path and strings",
"type": "string"
},
"enable_compression": {
"title": "Enable Compression",
"description": "Enable or disable processing compressed files. Currently .gz and .bz files are supported.",
"default": true,
"type": "boolean"
},
"sample_files": {
"title": "Sample Files",
"description": "Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled",
"default": true,
"type": "boolean"
}
},
"required": [
"include"
],
"additionalProperties": false
},
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
},
"ignore_old_state": {
"title": "Ignore Old State",
"description": "If set to True, ignores the previous checkpoint state.",
"default": false,
"type": "boolean"
},
"ignore_new_state": {
"title": "Ignore New State",
"description": "If set to True, ignores the current checkpoint state.",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"AwsAssumeRoleConfig": {
"title": "AwsAssumeRoleConfig",
"type": "object",
"properties": {
"RoleArn": {
"title": "Rolearn",
"description": "ARN of the role to assume.",
"type": "string"
},
"ExternalId": {
"title": "Externalid",
"description": "External ID to use when assuming the role.",
"type": "string"
}
},
"required": [
"RoleArn"
]
},
"AwsConnectionConfig": {
"title": "AwsConnectionConfig",
"description": "Common AWS credentials config.\n\nCurrently used by:\n - Glue source\n - SageMaker source\n - dbt source",
"type": "object",
"properties": {
"aws_access_key_id": {
"title": "Aws Access Key Id",
"description": "AWS access key ID. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"type": "string"
},
"aws_secret_access_key": {
"title": "Aws Secret Access Key",
"description": "AWS secret access key. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"type": "string"
},
"aws_session_token": {
"title": "Aws Session Token",
"description": "AWS session token. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"type": "string"
},
"aws_role": {
"title": "Aws Role",
"description": "AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are documented at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role",
"anyOf": [
{
"type": "string"
},
{
"type": "array",
"items": {
"anyOf": [
{
"type": "string"
},
{
"$ref": "#/definitions/AwsAssumeRoleConfig"
}
]
}
}
]
},
"aws_profile": {
"title": "Aws Profile",
"description": "Named AWS profile to use. Only used if access key / secret are unset. If not set the default will be used",
"type": "string"
},
"aws_region": {
"title": "Aws Region",
"description": "AWS region code.",
"type": "string"
},
"aws_endpoint_url": {
"title": "Aws Endpoint Url",
"description": "The AWS service endpoint. This is normally [constructed automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html), but can be overridden here.",
"type": "string"
},
"aws_proxy": {
"title": "Aws Proxy",
"description": "A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.",
"type": "object",
"additionalProperties": {
"type": "string"
}
}
},
"required": [
"aws_region"
],
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"DataLakeProfilerConfig": {
"title": "DataLakeProfilerConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"profile_table_level_only": {
"title": "Profile Table Level Only",
"description": "Whether to perform profiling at table-level only or include column-level profiling as well.",
"default": false,
"type": "boolean"
},
"max_number_of_fields_to_profile": {
"title": "Max Number Of Fields To Profile",
"description": "A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
"exclusiveMinimum": 0,
"type": "integer"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_mean_value": {
"title": "Include Field Mean Value",
"description": "Whether to profile for the mean value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_median_value": {
"title": "Include Field Median Value",
"description": "Whether to profile for the median value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_stddev_value": {
"title": "Include Field Stddev Value",
"description": "Whether to profile for the standard deviation of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_quantiles": {
"title": "Include Field Quantiles",
"description": "Whether to profile for the quantiles of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_distinct_value_frequencies": {
"title": "Include Field Distinct Value Frequencies",
"description": "Whether to profile for distinct value frequencies.",
"default": true,
"type": "boolean"
},
"include_field_histogram": {
"title": "Include Field Histogram",
"description": "Whether to profile for the histogram for numeric fields.",
"default": true,
"type": "boolean"
},
"include_field_sample_values": {
"title": "Include Field Sample Values",
"description": "Whether to profile for the sample values for all columns.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
}
}
}
Path Specs
Example - Dataset per file
Bucket structure:
test-s3-bucket
├── employees.csv
└── food_items.csv
Path specs config
path_specs:
- include: s3://test-s3-bucket/*.csv
Example - Datasets with partitions
Bucket structure:
test-s3-bucket
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── returns
└── year=2021
└── month=2
└── 1.parquet
Path specs config:
path_specs:
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
Example - Datasets with partition and exclude
Bucket structure:
test-s3-bucket
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── tmp_orders
└── year=2021
└── month=2
└── 1.parquet
Path specs config:
path_specs:
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
exclude:
- **/tmp_orders/**
Example - Datasets of mixed nature
Bucket structure:
test-s3-bucket
├── customers
│ ├── part1.json
│ ├── part2.json
│ ├── part3.json
│ └── part4.json
├── employees.csv
├── food_items.csv
├── tmp_10101000.csv
└── orders
└── year=2022
└── month=2
├── 1.parquet
├── 2.parquet
└── 3.parquet
Path specs config:
path_specs:
- include: s3://test-s3-bucket/*.csv
exclude:
- **/tmp_10101000.csv
- include: s3://test-s3-bucket/{table}/*.json
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
Valid path_specs.include
s3://my-bucket/foo/tests/bar.avro # single file table
s3://my-bucket/foo/tests/*.* # mulitple file level tables
s3://my-bucket/foo/tests/{table}/*.avro #table without partition
s3://my-bucket/foo/tests/{table}/*/*.avro #table where partitions are not specified
s3://my-bucket/foo/tests/{table}/*.* # table where no partitions as well as data type specified
s3://my-bucket/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name
s3://my-bucket/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro # specify partition key and value format
s3://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro # specify partition value only format
s3://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # for all extensions
s3://my-bucket/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 2 levels down in bucket
s3://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 3 levels down in bucket
Valid path_specs.exclude
- **/tests/**
- s3://my-bucket/hr/**
- */tests/.csv
- s3://my-bucket/foo/*/my_table/**
Notes
- {table} represents folder for which dataset will be created.
- include path must end with (. or *.[ext]) to represent leaf level.
- if *.[ext] is provided then only files with specified type will be scanned.
- /*/ represents single folder.
- {partition[i]} represents value of partition.
- {partition_key[i]} represents name of the partition.
- While extracting, “i” will be used to match partition_key to partition.
- all folder levels need to be specified in include. Only exclude path can have ** like matching.
- exclude path cannot have named variables ( {} ).
- Folder names should not contain {, }, *, / in their names.
- {folder} is reserved for internal working. please do not use in named variables.
If you would like to write a more complicated function for resolving file names, then a {transformer} would be a good fit.
Specify as long fixed prefix ( with out /*/ ) as possible in path_specs.include
. This will reduce the scanning time and cost, specifically on AWS S3
Running profiling against many tables or over many rows can run up significant costs. While we've done our best to limit the expensiveness of the queries the profiler runs, you should be prudent about the set of tables profiling is enabled on or the frequency of the profiling runs.
If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.
Compatibility
Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the SPARK_HOME
and SPARK_VERSION
environment variables to be set. The Spark+Hadoop binary can be downloaded here.
For an example guide on setting up PyDeequ on AWS, see this guide.
Code Coordinates
- Class Name:
datahub.ingestion.source.s3.source.S3Source
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for S3 Data Lake, feel free to ping us on our Slack.