CLI¶
Amora comes packed with a Command Line Interface, developed using tiangolo/typer. To check all the options, type amora --help
after the installation.
Usage: amora [OPTIONS] COMMAND [ARGS]...
Amora Data Build Tool enables engineers to transform data in their
warehouses by defining schemas and writing select statements with
SQLAlchemy. Amora handles turning these select statements into tables and
views
Options:
--install-completion Install completion for the current shell.
--show-completion Show completion for the current shell, to copy it or
customize the installation.
--help Show this message and exit.
Commands:
compile Generates executable SQL from model files.
materialize Executes the compiled SQL against the current target...
models
test Runs tests on data in deployed models.
amora compile¶
Generates executable SQL from model files. Compiled SQL files are written to the ./target
directory.
Source code in amora/cli/typer_app.py
@app.command()
def compile(
models: Optional[Models] = models_option,
target: Optional[str] = target_option,
force: Optional[bool] = force_option,
) -> None:
"""
Generates executable SQL from model files. Compiled SQL files are written to the `./target` directory.
"""
current_manifest = manifest.Manifest.from_project()
previous_manifest = manifest.Manifest.load()
if force or not previous_manifest:
compilation.remove_compiled_files()
models_to_compile = list_models()
else:
removed = previous_manifest.models.keys() - current_manifest.models.keys()
compilation.remove_compiled_files(removed)
models_to_compile = current_manifest.get_models_to_compile(previous_manifest)
for model, model_file_path in models_to_compile:
if models and not force and model_file_path.stem not in models:
continue
source_sql_statement = model.source()
if source_sql_statement is None:
typer.echo(f"โญ Skipping compilation of model `{model_file_path}`")
continue
target_file_path = model.target_path()
typer.echo(f"๐ Compiling model `{model_file_path}` -> `{target_file_path}`")
content = compilation.compile_statement(source_sql_statement)
target_file_path.parent.mkdir(parents=True, exist_ok=True)
target_file_path.write_text(content)
current_manifest.save()
amora materialize¶
Executes the compiled SQL against the current target database.
Source code in amora/cli/typer_app.py
@app.command()
def materialize(
models: Optional[Models] = models_option,
target: str = target_option,
draw_dag: bool = typer.Option(False, "--draw-dag"),
depends: bool = typer.Option(
False, "--depends", help="Flag to materialize also the dependents of the model"
),
no_compile: bool = typer.Option(
False,
"--no-compile",
help="Don't run `amora compile` before the materialization",
),
) -> None:
"""
Executes the compiled SQL against the current target database.
"""
if not no_compile:
force = depends and models != []
compile(models=models, target=target, force=force)
model_to_task: Dict[str, materialization.Task] = {}
for target_file_path in utils.list_target_files():
task = materialization.Task.for_target(target_file_path)
if models and target_file_path.stem not in models:
continue
model_to_task[task.model.unique_name()] = task
if depends:
for dependency_target_path in utils.recursive_dependencies_targets(
task.model
):
dependency_task = materialization.Task.for_target(
dependency_target_path
)
model_to_task[dependency_task.model.unique_name()] = dependency_task
dag = DependencyDAG.from_tasks(tasks=model_to_task.values())
if draw_dag:
dag.draw()
with futures.ProcessPoolExecutor(
max_workers=settings.MATERIALIZE_NUM_THREADS
) as executor:
for models_to_materialize in dag.topological_generations():
current_tasks: List[materialization.Task] = []
for model_name in models_to_materialize:
if model_name in model_to_task:
current_tasks.append(model_to_task[model_name])
else:
typer.echo(f"โ ๏ธ Skipping `{model_name}`")
continue
if not current_tasks:
continue
results = executor.map(
materialization.materialize,
[current_task.sql_stmt for current_task in current_tasks],
[current_task.model.unique_name() for current_task in current_tasks],
[current_task.model.__model_config__ for current_task in current_tasks],
)
for result in results:
if result:
typer.echo(result)
amora test¶
Runs tests on data in deployed models. Run this after amora materialize
to ensure that the data state is up-to-date. Optional arguments are passed to pytest.
Source code in amora/cli/typer_app.py
@app.command(
context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def test(ctx: typer.Context) -> None:
"""
Runs tests on data in deployed models. Run this after `amora materialize`
to ensure that the data state is up-to-date. Optional arguments are passed
to pytest.
"""
pytest_args = settings.DEFAULT_PYTEST_ARGS + ctx.args
return_code = pytest.main(pytest_args)
raise typer.Exit(return_code)
amora models¶
amora models list¶
List the models in your project as a human readable table or as a JSON serialized document
You can also use the option--with-total-bytes
to use BigQuery query dry run feature to gather model total bytes information Source code in amora/cli/models.py
@app.command(name="list")
def models_list(
format: str = typer.Option(
"table",
help="Output format. Options: json,table",
),
with_total_bytes: bool = typer.Option(
False,
help="Uses BigQuery query dry run feature "
"to gather model total bytes information",
),
) -> None:
"""
List the models in your project as a human readable table
or as a JSON serialized document
```shell
amora models list
```
You can also use the option `--with-total-bytes` to use
BigQuery query dry run feature to gather model total bytes information
```shell
amora models list --with-total-bytes
```
"""
@dataclass
class ResultItem:
model: Model
dry_run_result: Optional[DryRunResult] = None
def as_dict(self):
return {
"depends_on": self.depends_on,
"has_source": self.has_source,
"materialization_type": self.materialization_type,
"model_name": self.model_name,
"referenced_tables": self.referenced_tables,
"total_bytes": self.total_bytes,
"estimated_query_cost_in_usd": self.estimated_query_cost_in_usd,
"estimated_storage_cost_in_usd": self.estimated_storage_cost_in_usd,
}
@property
def model_name(self):
return self.model.__name__
@property
def has_source(self):
return self.model.source() is not None
@property
def depends_on(self) -> List[str]:
return sorted(dependency.name for dependency in self.model.dependencies())
@property
def estimated_query_cost_in_usd(self) -> Optional[str]:
if self.dry_run_result:
cost = estimated_query_cost_in_usd(self.dry_run_result.total_bytes)
return f"{cost:.{settings.MONEY_DECIMAL_PLACES}f}"
return None
@property
def estimated_storage_cost_in_usd(self) -> Optional[str]:
if self.dry_run_result:
cost = estimated_storage_cost_in_usd(self.dry_run_result.total_bytes)
return f"{cost:.{settings.MONEY_DECIMAL_PLACES}f}"
return None
@property
def total_bytes(self) -> Optional[int]:
if self.dry_run_result:
return self.dry_run_result.total_bytes
return None
@property
def referenced_tables(self) -> List[str]:
if self.dry_run_result:
return self.dry_run_result.referenced_tables
return []
@property
def materialization_type(self) -> Optional[str]:
if self.has_source:
return self.model.__model_config__.materialized.value
return None
results = []
placeholder = "-"
for model, _model_file_path in list_models():
if with_total_bytes:
result_item = ResultItem(model=model, dry_run_result=dry_run(model))
else:
result_item = ResultItem(model=model, dry_run_result=None)
results.append(result_item)
if format == "table":
table = Table(
show_header=True,
header_style="bold",
show_lines=True,
width=settings.CLI_CONSOLE_MAX_WIDTH,
row_styles=["none", "dim"],
)
table.add_column("Model name", style="green bold", no_wrap=True)
table.add_column("Total bytes", no_wrap=True)
table.add_column("Estimated query cost", no_wrap=True)
table.add_column("Estimated storage cost", no_wrap=True)
table.add_column("Referenced tables")
table.add_column("Depends on")
table.add_column("Has source?", no_wrap=True, justify="center")
table.add_column("Materialization", no_wrap=True)
for result in results:
table.add_row(
result.model_name,
f"{result.total_bytes or placeholder}",
result.estimated_query_cost_in_usd or placeholder,
result.estimated_storage_cost_in_usd or placeholder,
Text(
"\n".join(result.referenced_tables) or placeholder,
overflow="fold",
),
Text("\n".join(result.depends_on) or placeholder, overflow="fold"),
"๐ข" if result.has_source else "๐ด",
result.materialization_type or placeholder,
)
console = Console(width=settings.CLI_CONSOLE_MAX_WIDTH)
console.print(table)
elif format == "json":
output = {"models": [result.as_dict() for result in results]}
typer.echo(json.dumps(output))
If a machine readable format is required, the --format json
option can be used as followed:
{
"models": [
{
"depends_on": [
"Health"
],
"has_source": true,
"materialization_type": "table",
"model_name": "Steps",
"referenced_tables": [],
"total_bytes": null
},
{
"depends_on": [
"HeartRate"
],
"has_source": true,
"materialization_type": "table",
"model_name": "HeartRateAgg",
"referenced_tables": [],
"total_bytes": null
},
{
"depends_on": [],
"has_source": false,
"materialization_type": null,
"model_name": "Health",
"referenced_tables": [],
"total_bytes": null
},
{
"depends_on": [
"Health"
],
"has_source": true,
"materialization_type": "table",
"model_name": "HeartRate",
"referenced_tables": [],
"total_bytes": null
},
{
"depends_on": [
"Steps"
],
"has_source": true,
"materialization_type": "table",
"model_name": "StepsAgg",
"referenced_tables": [],
"total_bytes": null
}
]
}
amora models import¶
Generates a new amora model file from an existing table/view
amora models import --table-reference my_gcp_project.my_dataset.my_table my_gcp_project/my_dataset/my_table
Source code in amora/cli/models.py
@app.command(name="import")
def models_import(
table_reference: str = typer.Option(
...,
"--table-reference",
help="BigQuery unique table identifier. "
"E.g.: project-id.dataset-id.table-id",
),
model_file_path: str = typer.Argument(
None,
help="Canonical name of python module for the generated AmoraModel. "
"A good pattern would be to use an unique "
"and deterministic identifier, like: `project_id.dataset_id.table_id`",
),
overwrite: bool = typer.Option(
False, help="Overwrite the output file if one already exists"
),
):
"""
Generates a new amora model file from an existing table/view
```shell
amora models import --table-reference my_gcp_project.my_dataset.my_table my_gcp_project/my_dataset/my_table
```
"""
env = Environment(
loader=PackageLoader("amora"),
autoescape=select_autoescape(),
trim_blocks=True,
lstrip_blocks=True,
)
template = env.get_template("new-model.py.jinja2")
project, dataset, table = table_reference.split(".")
model_name = "".join(part.title() for part in table.split("_"))
if model_file_path:
destination_file_path = Path(model_file_path)
if (
destination_file_path.is_absolute()
and settings.models_path not in destination_file_path.parents
):
typer.echo(
"Destination path must be relative to the configured models path",
err=True,
)
raise typer.Exit(1)
else:
destination_file_path = settings.models_path.joinpath(
model_name.replace(".", "/") + ".py"
)
if destination_file_path.exists() and not overwrite:
typer.echo(
f"`{destination_file_path}` already exists. "
f"Pass `--overwrite` to overwrite file.",
err=True,
)
raise typer.Exit(1)
sorted_schema = sorted(get_schema(table_reference), key=lambda field: field.name)
model_source_code = template.render(
BIGQUERY_TYPES_TO_PYTHON_TYPES=BIGQUERY_TYPES_TO_PYTHON_TYPES,
BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES=BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES,
dataset=dataset,
dataset_id=f"{project}.{dataset}",
model_name=model_name,
project=project,
schema=sorted_schema,
table=table,
)
formatted_source_code = shed(model_source_code)
destination_file_path.parent.mkdir(parents=True, exist_ok=True)
destination_file_path.write_text(data=formatted_source_code)
typer.secho(
f"๐ Amora Model `{model_name}` (`{table_reference}`) imported!",
fg=typer.colors.GREEN,
bold=True,
)
typer.secho(f"Current File Path: `{destination_file_path.as_posix()}`")
amora feature-store¶
Info
Requires the feature-store
package extra
amora feature-store plan¶
Dry-run registering objects to the Feature Registry
The plan method dry-runs registering one or more definitions (e.g.: Entity, Feature View) and produces a markdown formatted report of all the changes that would be introduced in the Feature Registry by an amora feature-store apply
execution.
The changes computed by the plan
command are informational, and are not actually applied to the registry.
Source code in amora/cli/feature_store.py
@app.command(name="plan")
def feature_store_plan():
"""
Dry-run registering objects to the Feature Registry
The plan method dry-runs registering one or more definitions (e.g.: Entity, Feature View)
and produces a markdown formatted report of all the changes that would be introduced
in the Feature Registry by an `amora feature-store apply` execution.
The changes computed by the `plan` command are informational, and are not actually applied to the registry.
"""
from amora.feature_store import fs, registry
from amora.feature_store.config import settings
registry_diff, infra_diff, _infra = fs.plan(
desired_repo_contents=registry.get_repo_contents()
)
def records_as_markdown_table(records) -> str:
return pd.DataFrame.from_records(records).to_markdown(
tablefmt=settings.MARKDOWN_FORMAT
)
def diff_as_markdown_table(diff) -> str:
records = [dict(o) for o in diff]
return records_as_markdown_table(records)
diff = registry.parse_diff(registry_diff)
typer.echo("## Amora :: Feature Store :: Registry objects diff\n")
typer.echo(diff_as_markdown_table(diff.objects))
typer.echo("## Amora :: Feature Store :: Properties diff\n")
typer.echo(diff_as_markdown_table(diff.properties))
typer.echo("## Amora :: Feature Store :: Features diff\n")
for feature_diff in diff.features:
typer.echo(f"### {feature_diff.name}\n")
typer.echo(records_as_markdown_table(feature_diff.diff))
typer.echo("## Amora :: Feature Store :: Infrastructure diff\n")
typer.echo(infra_diff.to_string())
amora feature-store apply¶
-
Scans Python files in your amora project and find all models defined as feature views.
-
Validate your feature definitions
-
Sync the metadata about feature store objects to the feature registry. If a registry does not exist, then it will be instantiated. The standard registry is a simple protobuf binary file that is stored on disk (locally or in an object store).
-
Create all necessary feature store infrastructure. The exact infrastructure that is deployed or configured depends on the provider configuration. For example, setting local as your provider will result in a sqlite online store being created.
Source code in amora/cli/feature_store.py
@app.command(name="apply")
def feature_store_apply():
"""
1. Scans Python files in your amora project and find all models defined as
feature views.
2. Validate your feature definitions
3. Sync the metadata about feature store objects to the feature registry.
If a registry does not exist, then it will be instantiated.
The standard registry is a simple protobuf binary file
that is stored on disk (locally or in an object store).
4. Create all necessary feature store infrastructure.
The exact infrastructure that is deployed or configured depends
on the provider configuration. For example, setting local as
your provider will result in a sqlite online store being created.
"""
from feast.repo_operations import apply_total_with_repo_instance
from amora.feature_store import fs
from amora.feature_store.registry import get_repo_contents
apply_total_with_repo_instance(
store=fs,
project=fs.project,
registry=fs.registry,
repo=get_repo_contents(),
skip_source_validation=False,
)
amora feature-store materialize¶
Run a (non-incremental) materialization job to ingest data into the online store. All data between start_ts
and end_ts
will be read from the offline store and written into the online store. If you don't specify feature view names using --models
, all registered Feature Views will be materialized.
Source code in amora/cli/feature_store.py
@app.command(name="materialize")
def feature_store_materialize(
start_ts: str = typer.Argument(
None,
help="Start timestamp on ISO 8601 format. E.g.: '2022-01-01T01:00:00'",
),
end_ts: str = typer.Argument(
None,
help="End timestamp on ISO 8601 format. E.g.: '2022-01-02T01:00:00'",
),
models: Optional[Models] = models_option,
):
"""
Run a (non-incremental) materialization job to ingest data into the online
store. All data between `start_ts` and `end_ts` will be read from the offline
store and written into the online store. If you don't specify feature view
names using `--models`, all registered Feature Views will be materialized.
"""
from amora.feature_store import fs
from amora.feature_store.registry import get_repo_contents
repo_contents = get_repo_contents()
if models:
views_to_materialize = [
fv.name for fv in repo_contents.feature_views if fv.name in models
]
else:
views_to_materialize = [fv.name for fv in repo_contents.feature_views]
fs.materialize(
feature_views=views_to_materialize,
start_date=datetime.fromisoformat(start_ts),
end_date=datetime.fromisoformat(end_ts),
)
amora feature-store materialize-incremental¶
Load data from feature views into the online store, beginning from either the previous materialize
or materialize-incremental
end date, or the beginning of time.
Source code in amora/cli/feature_store.py
@app.command(name="materialize-incremental")
def feature_store_materialize_incremental(
end_ts: Optional[str] = typer.Argument(
None,
help="End timestamp on ISO 8601 format. E.g.: '2022-01-02T01:00:00'. If a date isn't provided, `datetime.utcnow` is used",
),
models: Optional[Models] = models_option,
):
"""
Load data from feature views into the online store, beginning from either the previous `materialize`
or `materialize-incremental` end date, or the beginning of time.
"""
from amora.feature_store import fs
from amora.feature_store.registry import get_repo_contents
repo_contents = get_repo_contents()
if models:
views_to_materialize = [
fv.name for fv in repo_contents.feature_views if fv.name in models
]
else:
views_to_materialize = [fv.name for fv in repo_contents.feature_views]
if end_ts is not None:
end_date = datetime.fromisoformat(end_ts)
else:
end_date = datetime.utcnow()
fs.materialize_incremental(
feature_views=views_to_materialize,
end_date=end_date,
)
amora feature-store serve¶
Starts the feature server HTTP app.
Routes:
- `POST /get-online-features`
`curl -XPOST -H "Content-type: application/json" -d '{"features": ["step_count_by_source:value_avg", "step_count_by_source:value_sum", "step_count_by_source:value_count"], "entities": {"source_name": ["Mi Fit", "Diogo iPhone", "An invalid source"]}}' 'http://localhost:8666/get-online-features'`
```json
{
"metadata": {
"feature_names": [
"source_name",
"value_count",
"value_sum",
"value_avg"
]
},
"results": [
{
"values": [
"Mi Fit",
6.0,
809.0,
134.8333282470703
],
"statuses": [
"PRESENT",
"PRESENT",
"PRESENT",
"PRESENT"
],
"event_timestamps": [
"1970-01-01T00:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z"
]
},
{
"values": [
"Diogo iPhone",
2.0,
17.0,
8.5
],
"statuses": [
"PRESENT",
"PRESENT",
"PRESENT",
"PRESENT"
],
"event_timestamps": [
"1970-01-01T00:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z"
]
},
{
"values": [
"An invalid source",
null,
null,
null
],
"statuses": [
"PRESENT",
"NOT_FOUND",
"NOT_FOUND",
"NOT_FOUND"
],
"event_timestamps": [
"1970-01-01T00:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z"
]
}
]
}
```
More on: https://docs.feast.dev/v/v0.9-branch/user-guide/getting-online-features
- `GET /list-feature-views`. E.g.:
`curl http://localhost:8666/list-feature-views | jq`
```json
[
{
"name": "step_count_by_source",
"features": [
"step_count_by_source:value_avg",
"step_count_by_source:value_sum",
"step_count_by_source:value_count"
],
"entities": [
"source_name"
]
}
]
```
Source code in amora/cli/feature_store.py
@app.command(name="serve")
def feature_store_serve():
"""
Starts the feature server HTTP app.
Routes:
- `POST /get-online-features`
`curl -XPOST -H "Content-type: application/json" -d '{"features": ["step_count_by_source:value_avg", "step_count_by_source:value_sum", "step_count_by_source:value_count"], "entities": {"source_name": ["Mi Fit", "Diogo iPhone", "An invalid source"]}}' 'http://localhost:8666/get-online-features'`
```json
{
"metadata": {
"feature_names": [
"source_name",
"value_count",
"value_sum",
"value_avg"
]
},
"results": [
{
"values": [
"Mi Fit",
6.0,
809.0,
134.8333282470703
],
"statuses": [
"PRESENT",
"PRESENT",
"PRESENT",
"PRESENT"
],
"event_timestamps": [
"1970-01-01T00:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z"
]
},
{
"values": [
"Diogo iPhone",
2.0,
17.0,
8.5
],
"statuses": [
"PRESENT",
"PRESENT",
"PRESENT",
"PRESENT"
],
"event_timestamps": [
"1970-01-01T00:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z"
]
},
{
"values": [
"An invalid source",
null,
null,
null
],
"statuses": [
"PRESENT",
"NOT_FOUND",
"NOT_FOUND",
"NOT_FOUND"
],
"event_timestamps": [
"1970-01-01T00:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z",
"2021-07-23T02:00:00Z"
]
}
]
}
```
More on: https://docs.feast.dev/v/v0.9-branch/user-guide/getting-online-features
- `GET /list-feature-views`. E.g.:
`curl http://localhost:8666/list-feature-views | jq`
```json
[
{
"name": "step_count_by_source",
"features": [
"step_count_by_source:value_avg",
"step_count_by_source:value_sum",
"step_count_by_source:value_count"
],
"entities": [
"source_name"
]
}
]
```
"""
import uvicorn
from feast.feature_server import get_app
from prometheus_fastapi_instrumentator import Instrumentator
from amora.feature_store import fs
from amora.feature_store.config import settings
app = get_app(store=fs)
@app.get("/list-feature-views")
def list_feature_views():
fvs = fs.list_feature_views()
return [
{
"name": fv.name,
"features": [f"{fv.name}:{feature.name}" for feature in fv.features],
"entities": [entity for entity in fv.entities],
"description": fv.description,
}
for fv in fvs
]
Instrumentator().instrument(app).expose(app)
uvicorn.run(
app,
host=settings.HTTP_SERVER_HOST,
port=settings.HTTP_SERVER_PORT,
access_log=settings.HTTP_ACCESS_LOG_ENABLED,
)