Skip to content

CLI

Amora comes packed with a Command Line Interface, developed using tiangolo/typer. To check all the options, type amora --help after the installation.

Usage: amora [OPTIONS] COMMAND [ARGS]...

  Amora Data Build Tool enables engineers to transform data in their
  warehouses by defining schemas and writing select statements with
  SQLAlchemy. Amora handles turning these select statements into tables and
  views

Options:
  --install-completion  Install completion for the current shell.
  --show-completion     Show completion for the current shell, to copy it or
                        customize the installation.
  --help                Show this message and exit.

Commands:
  compile      Generates executable SQL from model files.
  materialize  Executes the compiled SQL against the current target...
  models
  test         Runs tests on data in deployed models.

amora compile

Generates executable SQL from model files. Compiled SQL files are written to the ./target directory.

Source code in amora/cli/typer_app.py
@app.command()
def compile(
    models: Optional[Models] = models_option,
    target: Optional[str] = target_option,
    force: Optional[bool] = force_option,
) -> None:
    """
    Generates executable SQL from model files. Compiled SQL files are written to the `./target` directory.
    """

    current_manifest = manifest.Manifest.from_project()
    previous_manifest = manifest.Manifest.load()

    if force or not previous_manifest:
        compilation.remove_compiled_files()
        models_to_compile = list_models()
    else:
        removed = previous_manifest.models.keys() - current_manifest.models.keys()
        compilation.remove_compiled_files(removed)
        models_to_compile = current_manifest.get_models_to_compile(previous_manifest)

    for model, model_file_path in models_to_compile:
        if models and not force and model_file_path.stem not in models:
            continue

        source_sql_statement = model.source()
        if source_sql_statement is None:
            typer.echo(f"โญ Skipping compilation of model `{model_file_path}`")
            continue

        target_file_path = model.target_path()
        typer.echo(f"๐Ÿ— Compiling model `{model_file_path}` -> `{target_file_path}`")

        content = compilation.compile_statement(source_sql_statement)
        target_file_path.parent.mkdir(parents=True, exist_ok=True)
        target_file_path.write_text(content)

    current_manifest.save()

amora materialize

Executes the compiled SQL against the current target database.

Source code in amora/cli/typer_app.py
@app.command()
def materialize(
    models: Optional[Models] = models_option,
    target: str = target_option,
    draw_dag: bool = typer.Option(False, "--draw-dag"),
    depends: bool = typer.Option(
        False, "--depends", help="Flag to materialize also the dependents of the model"
    ),
    no_compile: bool = typer.Option(
        False,
        "--no-compile",
        help="Don't run `amora compile` before the materialization",
    ),
) -> None:
    """
    Executes the compiled SQL against the current target database.
    """
    if not no_compile:
        force = depends and models != []
        compile(models=models, target=target, force=force)

    model_to_task: Dict[str, materialization.Task] = {}

    for target_file_path in utils.list_target_files():
        task = materialization.Task.for_target(target_file_path)

        if models and target_file_path.stem not in models:
            continue

        model_to_task[task.model.unique_name()] = task

        if depends:
            for dependency_target_path in utils.recursive_dependencies_targets(
                task.model
            ):
                dependency_task = materialization.Task.for_target(
                    dependency_target_path
                )
                model_to_task[dependency_task.model.unique_name()] = dependency_task

    dag = DependencyDAG.from_tasks(tasks=model_to_task.values())

    if draw_dag:
        dag.draw()

    with futures.ProcessPoolExecutor(
        max_workers=settings.MATERIALIZE_NUM_THREADS
    ) as executor:
        for models_to_materialize in dag.topological_generations():
            current_tasks: List[materialization.Task] = []
            for model_name in models_to_materialize:
                if model_name in model_to_task:
                    current_tasks.append(model_to_task[model_name])
                else:
                    typer.echo(f"โš ๏ธ  Skipping `{model_name}`")
                    continue

            if not current_tasks:
                continue

            results = executor.map(
                materialization.materialize,
                [current_task.sql_stmt for current_task in current_tasks],
                [current_task.model.unique_name() for current_task in current_tasks],
                [current_task.model.__model_config__ for current_task in current_tasks],
            )

            for result in results:
                if result:
                    typer.echo(result)

amora test

Runs tests on data in deployed models. Run this after amora materialize to ensure that the data state is up-to-date. Optional arguments are passed to pytest.

Source code in amora/cli/typer_app.py
@app.command(
    context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def test(ctx: typer.Context) -> None:
    """
    Runs tests on data in deployed models. Run this after `amora materialize`
    to ensure that the data state is up-to-date. Optional arguments are passed
    to pytest.
    """

    pytest_args = settings.DEFAULT_PYTEST_ARGS + ctx.args
    return_code = pytest.main(pytest_args)

    raise typer.Exit(return_code)

amora models

amora models list

List the models in your project as a human readable table or as a JSON serialized document

amora models list
You can also use the option --with-total-bytes to use BigQuery query dry run feature to gather model total bytes information

amora models list --with-total-bytes
Source code in amora/cli/models.py
@app.command(name="list")
def models_list(
    format: str = typer.Option(
        "table",
        help="Output format. Options: json,table",
    ),
    with_total_bytes: bool = typer.Option(
        False,
        help="Uses BigQuery query dry run feature "
        "to gather model total bytes information",
    ),
) -> None:
    """
    List the models in your project as a human readable table
    or as a JSON serialized document

    ```shell
    amora models list
    ```
    You can also use the option `--with-total-bytes` to use
    BigQuery query dry run feature to gather model total bytes information

    ```shell
    amora models list --with-total-bytes
    ```

    """

    @dataclass
    class ResultItem:
        model: Model
        dry_run_result: Optional[DryRunResult] = None

        def as_dict(self):
            return {
                "depends_on": self.depends_on,
                "has_source": self.has_source,
                "materialization_type": self.materialization_type,
                "model_name": self.model_name,
                "referenced_tables": self.referenced_tables,
                "total_bytes": self.total_bytes,
                "estimated_query_cost_in_usd": self.estimated_query_cost_in_usd,
                "estimated_storage_cost_in_usd": self.estimated_storage_cost_in_usd,
            }

        @property
        def model_name(self):
            return self.model.__name__

        @property
        def has_source(self):
            return self.model.source() is not None

        @property
        def depends_on(self) -> List[str]:
            return sorted(dependency.name for dependency in self.model.dependencies())

        @property
        def estimated_query_cost_in_usd(self) -> Optional[str]:
            if self.dry_run_result:
                cost = estimated_query_cost_in_usd(self.dry_run_result.total_bytes)
                return f"{cost:.{settings.MONEY_DECIMAL_PLACES}f}"
            return None

        @property
        def estimated_storage_cost_in_usd(self) -> Optional[str]:
            if self.dry_run_result:
                cost = estimated_storage_cost_in_usd(self.dry_run_result.total_bytes)
                return f"{cost:.{settings.MONEY_DECIMAL_PLACES}f}"
            return None

        @property
        def total_bytes(self) -> Optional[int]:
            if self.dry_run_result:
                return self.dry_run_result.total_bytes
            return None

        @property
        def referenced_tables(self) -> List[str]:
            if self.dry_run_result:
                return self.dry_run_result.referenced_tables
            return []

        @property
        def materialization_type(self) -> Optional[str]:
            if self.has_source:
                return self.model.__model_config__.materialized.value

            return None

    results = []
    placeholder = "-"

    for model, _model_file_path in list_models():
        if with_total_bytes:
            result_item = ResultItem(model=model, dry_run_result=dry_run(model))
        else:
            result_item = ResultItem(model=model, dry_run_result=None)

        results.append(result_item)

    if format == "table":
        table = Table(
            show_header=True,
            header_style="bold",
            show_lines=True,
            width=settings.CLI_CONSOLE_MAX_WIDTH,
            row_styles=["none", "dim"],
        )

        table.add_column("Model name", style="green bold", no_wrap=True)
        table.add_column("Total bytes", no_wrap=True)
        table.add_column("Estimated query cost", no_wrap=True)
        table.add_column("Estimated storage cost", no_wrap=True)
        table.add_column("Referenced tables")
        table.add_column("Depends on")
        table.add_column("Has source?", no_wrap=True, justify="center")
        table.add_column("Materialization", no_wrap=True)

        for result in results:
            table.add_row(
                result.model_name,
                f"{result.total_bytes or placeholder}",
                result.estimated_query_cost_in_usd or placeholder,
                result.estimated_storage_cost_in_usd or placeholder,
                Text(
                    "\n".join(result.referenced_tables) or placeholder,
                    overflow="fold",
                ),
                Text("\n".join(result.depends_on) or placeholder, overflow="fold"),
                "๐ŸŸข" if result.has_source else "๐Ÿ”ด",
                result.materialization_type or placeholder,
            )

        console = Console(width=settings.CLI_CONSOLE_MAX_WIDTH)
        console.print(table)

    elif format == "json":
        output = {"models": [result.as_dict() for result in results]}
        typer.echo(json.dumps(output))

amora models list


amora models list --with-total-bytes


If a machine readable format is required, the --format json option can be used as followed:

$ amora models list --format json
{
  "models": [
    {
      "depends_on": [
        "Health"
      ],
      "has_source": true,
      "materialization_type": "table",
      "model_name": "Steps",
      "referenced_tables": [],
      "total_bytes": null
    },
    {
      "depends_on": [
        "HeartRate"
      ],
      "has_source": true,
      "materialization_type": "table",
      "model_name": "HeartRateAgg",
      "referenced_tables": [],
      "total_bytes": null
    },
    {
      "depends_on": [],
      "has_source": false,
      "materialization_type": null,
      "model_name": "Health",
      "referenced_tables": [],
      "total_bytes": null
    },
    {
      "depends_on": [
        "Health"
      ],
      "has_source": true,
      "materialization_type": "table",
      "model_name": "HeartRate",
      "referenced_tables": [],
      "total_bytes": null
    },
    {
      "depends_on": [
        "Steps"
      ],
      "has_source": true,
      "materialization_type": "table",
      "model_name": "StepsAgg",
      "referenced_tables": [],
      "total_bytes": null
    }
  ]
}

amora models import

Generates a new amora model file from an existing table/view

amora models import --table-reference my_gcp_project.my_dataset.my_table my_gcp_project/my_dataset/my_table
Source code in amora/cli/models.py
@app.command(name="import")
def models_import(
    table_reference: str = typer.Option(
        ...,
        "--table-reference",
        help="BigQuery unique table identifier. "
        "E.g.: project-id.dataset-id.table-id",
    ),
    model_file_path: str = typer.Argument(
        None,
        help="Canonical name of python module for the generated AmoraModel. "
        "A good pattern would be to use an unique "
        "and deterministic identifier, like: `project_id.dataset_id.table_id`",
    ),
    overwrite: bool = typer.Option(
        False, help="Overwrite the output file if one already exists"
    ),
):
    """
    Generates a new amora model file from an existing table/view

    ```shell
    amora models import --table-reference my_gcp_project.my_dataset.my_table my_gcp_project/my_dataset/my_table
    ```
    """

    env = Environment(
        loader=PackageLoader("amora"),
        autoescape=select_autoescape(),
        trim_blocks=True,
        lstrip_blocks=True,
    )
    template = env.get_template("new-model.py.jinja2")

    project, dataset, table = table_reference.split(".")
    model_name = "".join(part.title() for part in table.split("_"))

    if model_file_path:
        destination_file_path = Path(model_file_path)
        if (
            destination_file_path.is_absolute()
            and settings.models_path not in destination_file_path.parents
        ):
            typer.echo(
                "Destination path must be relative to the configured models path",
                err=True,
            )
            raise typer.Exit(1)
    else:
        destination_file_path = settings.models_path.joinpath(
            model_name.replace(".", "/") + ".py"
        )

    if destination_file_path.exists() and not overwrite:
        typer.echo(
            f"`{destination_file_path}` already exists. "
            f"Pass `--overwrite` to overwrite file.",
            err=True,
        )
        raise typer.Exit(1)

    sorted_schema = sorted(get_schema(table_reference), key=lambda field: field.name)
    model_source_code = template.render(
        BIGQUERY_TYPES_TO_PYTHON_TYPES=BIGQUERY_TYPES_TO_PYTHON_TYPES,
        BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES=BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES,
        dataset=dataset,
        dataset_id=f"{project}.{dataset}",
        model_name=model_name,
        project=project,
        schema=sorted_schema,
        table=table,
    )
    formatted_source_code = shed(model_source_code)

    destination_file_path.parent.mkdir(parents=True, exist_ok=True)
    destination_file_path.write_text(data=formatted_source_code)

    typer.secho(
        f"๐ŸŽ‰ Amora Model `{model_name}` (`{table_reference}`) imported!",
        fg=typer.colors.GREEN,
        bold=True,
    )
    typer.secho(f"Current File Path: `{destination_file_path.as_posix()}`")

amora feature-store

Info

Requires the feature-store package extra

amora feature-store plan

Dry-run registering objects to the Feature Registry

The plan method dry-runs registering one or more definitions (e.g.: Entity, Feature View) and produces a markdown formatted report of all the changes that would be introduced in the Feature Registry by an amora feature-store apply execution.

The changes computed by the plan command are informational, and are not actually applied to the registry.

Source code in amora/cli/feature_store.py
@app.command(name="plan")
def feature_store_plan():
    """
    Dry-run registering objects to the Feature Registry

    The plan method dry-runs registering one or more definitions (e.g.: Entity, Feature View)
    and produces a markdown formatted report of all the changes that would be introduced
    in the Feature Registry by an `amora feature-store apply` execution.

    The changes computed by the `plan` command are informational, and are not actually applied to the registry.
    """

    from amora.feature_store import fs, registry
    from amora.feature_store.config import settings

    registry_diff, infra_diff, _infra = fs.plan(
        desired_repo_contents=registry.get_repo_contents()
    )

    def records_as_markdown_table(records) -> str:
        return pd.DataFrame.from_records(records).to_markdown(
            tablefmt=settings.MARKDOWN_FORMAT
        )

    def diff_as_markdown_table(diff) -> str:
        records = [dict(o) for o in diff]
        return records_as_markdown_table(records)

    diff = registry.parse_diff(registry_diff)

    typer.echo("## Amora :: Feature Store :: Registry objects diff\n")
    typer.echo(diff_as_markdown_table(diff.objects))

    typer.echo("## Amora :: Feature Store :: Properties diff\n")
    typer.echo(diff_as_markdown_table(diff.properties))

    typer.echo("## Amora :: Feature Store :: Features diff\n")
    for feature_diff in diff.features:
        typer.echo(f"### {feature_diff.name}\n")
        typer.echo(records_as_markdown_table(feature_diff.diff))

    typer.echo("## Amora :: Feature Store :: Infrastructure diff\n")
    typer.echo(infra_diff.to_string())

amora feature-store apply

  1. Scans Python files in your amora project and find all models defined as feature views.

  2. Validate your feature definitions

  3. Sync the metadata about feature store objects to the feature registry. If a registry does not exist, then it will be instantiated. The standard registry is a simple protobuf binary file that is stored on disk (locally or in an object store).

  4. Create all necessary feature store infrastructure. The exact infrastructure that is deployed or configured depends on the provider configuration. For example, setting local as your provider will result in a sqlite online store being created.

Source code in amora/cli/feature_store.py
@app.command(name="apply")
def feature_store_apply():
    """
    1. Scans Python files in your amora project and find all models defined as
    feature views.

    2. Validate your feature definitions

    3. Sync the metadata about feature store objects to the feature registry.
    If a registry does not exist, then it will be instantiated.
    The standard registry is a simple protobuf binary file
    that is stored on disk (locally or in an object store).

    4. Create all necessary feature store infrastructure.
    The exact infrastructure that is deployed or configured depends
    on the provider configuration. For example, setting local as
    your provider will result in a sqlite online store being created.
    """
    from feast.repo_operations import apply_total_with_repo_instance

    from amora.feature_store import fs
    from amora.feature_store.registry import get_repo_contents

    apply_total_with_repo_instance(
        store=fs,
        project=fs.project,
        registry=fs.registry,
        repo=get_repo_contents(),
        skip_source_validation=False,
    )

amora feature-store materialize

Run a (non-incremental) materialization job to ingest data into the online store. All data between start_ts and end_ts will be read from the offline store and written into the online store. If you don't specify feature view names using --models, all registered Feature Views will be materialized.

Source code in amora/cli/feature_store.py
@app.command(name="materialize")
def feature_store_materialize(
    start_ts: str = typer.Argument(
        None,
        help="Start timestamp on ISO 8601 format. E.g.: '2022-01-01T01:00:00'",
    ),
    end_ts: str = typer.Argument(
        None,
        help="End timestamp on ISO 8601 format. E.g.: '2022-01-02T01:00:00'",
    ),
    models: Optional[Models] = models_option,
):
    """
    Run a (non-incremental) materialization job to ingest data into the online
    store. All data between `start_ts` and `end_ts` will be read from the offline
    store and written into the online store. If you don't specify feature view
    names using `--models`, all registered Feature Views will be materialized.
    """
    from amora.feature_store import fs
    from amora.feature_store.registry import get_repo_contents

    repo_contents = get_repo_contents()

    if models:
        views_to_materialize = [
            fv.name for fv in repo_contents.feature_views if fv.name in models
        ]
    else:
        views_to_materialize = [fv.name for fv in repo_contents.feature_views]

    fs.materialize(
        feature_views=views_to_materialize,
        start_date=datetime.fromisoformat(start_ts),
        end_date=datetime.fromisoformat(end_ts),
    )

amora feature-store materialize-incremental

Load data from feature views into the online store, beginning from either the previous materialize or materialize-incremental end date, or the beginning of time.

Source code in amora/cli/feature_store.py
@app.command(name="materialize-incremental")
def feature_store_materialize_incremental(
    end_ts: Optional[str] = typer.Argument(
        None,
        help="End timestamp on ISO 8601 format. E.g.: '2022-01-02T01:00:00'. If a date isn't provided, `datetime.utcnow` is used",
    ),
    models: Optional[Models] = models_option,
):
    """
    Load data from feature views into the online store, beginning from either the previous `materialize`
    or `materialize-incremental` end date, or the beginning of time.

    """
    from amora.feature_store import fs
    from amora.feature_store.registry import get_repo_contents

    repo_contents = get_repo_contents()

    if models:
        views_to_materialize = [
            fv.name for fv in repo_contents.feature_views if fv.name in models
        ]
    else:
        views_to_materialize = [fv.name for fv in repo_contents.feature_views]

    if end_ts is not None:
        end_date = datetime.fromisoformat(end_ts)
    else:
        end_date = datetime.utcnow()

    fs.materialize_incremental(
        feature_views=views_to_materialize,
        end_date=end_date,
    )

amora feature-store serve

Starts the feature server HTTP app.

Routes:

- `POST /get-online-features`

`curl -XPOST -H "Content-type: application/json" -d '{"features": ["step_count_by_source:value_avg", "step_count_by_source:value_sum", "step_count_by_source:value_count"], "entities": {"source_name": ["Mi Fit", "Diogo iPhone", "An invalid source"]}}' 'http://localhost:8666/get-online-features'`

```json
{
  "metadata": {
    "feature_names": [
      "source_name",
      "value_count",
      "value_sum",
      "value_avg"
    ]
  },
  "results": [
    {
      "values": [
        "Mi Fit",
        6.0,
        809.0,
        134.8333282470703
      ],
      "statuses": [
        "PRESENT",
        "PRESENT",
        "PRESENT",
        "PRESENT"
      ],
      "event_timestamps": [
        "1970-01-01T00:00:00Z",
        "2021-07-23T02:00:00Z",
        "2021-07-23T02:00:00Z",
        "2021-07-23T02:00:00Z"
      ]
    },
    {
      "values": [
        "Diogo iPhone",
        2.0,
        17.0,
        8.5
      ],
      "statuses": [
        "PRESENT",
        "PRESENT",
        "PRESENT",
        "PRESENT"
      ],
      "event_timestamps": [
        "1970-01-01T00:00:00Z",
        "2021-07-23T02:00:00Z",
        "2021-07-23T02:00:00Z",
        "2021-07-23T02:00:00Z"
      ]
    },
    {
      "values": [
        "An invalid source",
        null,
        null,
        null
      ],
      "statuses": [
        "PRESENT",
        "NOT_FOUND",
        "NOT_FOUND",
        "NOT_FOUND"
      ],
      "event_timestamps": [
        "1970-01-01T00:00:00Z",
        "2021-07-23T02:00:00Z",
        "2021-07-23T02:00:00Z",
        "2021-07-23T02:00:00Z"
      ]
    }
  ]
}
```

More on: https://docs.feast.dev/v/v0.9-branch/user-guide/getting-online-features

- `GET /list-feature-views`. E.g.:

`curl http://localhost:8666/list-feature-views | jq`

```json
[
    {
        "name": "step_count_by_source",
        "features": [
            "step_count_by_source:value_avg",
            "step_count_by_source:value_sum",
            "step_count_by_source:value_count"
        ],
        "entities": [
            "source_name"
        ]
    }
]
```
Source code in amora/cli/feature_store.py
@app.command(name="serve")
def feature_store_serve():
    """
    Starts the feature server HTTP app.

    Routes:

        - `POST /get-online-features`

        `curl -XPOST -H "Content-type: application/json" -d '{"features": ["step_count_by_source:value_avg", "step_count_by_source:value_sum", "step_count_by_source:value_count"], "entities": {"source_name": ["Mi Fit", "Diogo iPhone", "An invalid source"]}}' 'http://localhost:8666/get-online-features'`

        ```json
        {
          "metadata": {
            "feature_names": [
              "source_name",
              "value_count",
              "value_sum",
              "value_avg"
            ]
          },
          "results": [
            {
              "values": [
                "Mi Fit",
                6.0,
                809.0,
                134.8333282470703
              ],
              "statuses": [
                "PRESENT",
                "PRESENT",
                "PRESENT",
                "PRESENT"
              ],
              "event_timestamps": [
                "1970-01-01T00:00:00Z",
                "2021-07-23T02:00:00Z",
                "2021-07-23T02:00:00Z",
                "2021-07-23T02:00:00Z"
              ]
            },
            {
              "values": [
                "Diogo iPhone",
                2.0,
                17.0,
                8.5
              ],
              "statuses": [
                "PRESENT",
                "PRESENT",
                "PRESENT",
                "PRESENT"
              ],
              "event_timestamps": [
                "1970-01-01T00:00:00Z",
                "2021-07-23T02:00:00Z",
                "2021-07-23T02:00:00Z",
                "2021-07-23T02:00:00Z"
              ]
            },
            {
              "values": [
                "An invalid source",
                null,
                null,
                null
              ],
              "statuses": [
                "PRESENT",
                "NOT_FOUND",
                "NOT_FOUND",
                "NOT_FOUND"
              ],
              "event_timestamps": [
                "1970-01-01T00:00:00Z",
                "2021-07-23T02:00:00Z",
                "2021-07-23T02:00:00Z",
                "2021-07-23T02:00:00Z"
              ]
            }
          ]
        }
        ```

        More on: https://docs.feast.dev/v/v0.9-branch/user-guide/getting-online-features

        - `GET /list-feature-views`. E.g.:

        `curl http://localhost:8666/list-feature-views | jq`

        ```json
        [
            {
                "name": "step_count_by_source",
                "features": [
                    "step_count_by_source:value_avg",
                    "step_count_by_source:value_sum",
                    "step_count_by_source:value_count"
                ],
                "entities": [
                    "source_name"
                ]
            }
        ]
        ```
    """
    import uvicorn
    from feast.feature_server import get_app
    from prometheus_fastapi_instrumentator import Instrumentator

    from amora.feature_store import fs
    from amora.feature_store.config import settings

    app = get_app(store=fs)

    @app.get("/list-feature-views")
    def list_feature_views():
        fvs = fs.list_feature_views()
        return [
            {
                "name": fv.name,
                "features": [f"{fv.name}:{feature.name}" for feature in fv.features],
                "entities": [entity for entity in fv.entities],
                "description": fv.description,
            }
            for fv in fvs
        ]

    Instrumentator().instrument(app).expose(app)

    uvicorn.run(
        app,
        host=settings.HTTP_SERVER_HOST,
        port=settings.HTTP_SERVER_PORT,
        access_log=settings.HTTP_ACCESS_LOG_ENABLED,
    )

Last update: 2023-11-23