Skip to content

BigQuery

Running queries

Executes a given query and returns its results and metadata as an amora.providers.bigquery.RunResult

Source code in amora/providers/bigquery.py
@log_execution()
def run(statement: Compilable) -> RunResult:
    """
    Executes a given query and returns its results
    and metadata as an `amora.providers.bigquery.RunResult`
    """
    query = compile_statement(statement)
    query_job = get_client().query(query)
    rows = query_job.result()
    execution_time_delta = query_job.ended - query_job.started

    return RunResult(
        execution_time_in_ms=execution_time_delta.microseconds / 1000,
        job_id=query_job.job_id,
        query=query,
        referenced_tables=[
            ".".join(table.to_api_repr().values())
            for table in query_job.referenced_tables
        ],
        rows=rows,
        schema=query_job.schema,
        total_bytes=query_job.total_bytes_billed,
        user_email=query_job.user_email,
        to_dataframe=query_job.to_dataframe,
    )
You can use the estimate returned by the dry run to calculate query
costs in the pricing calculator. Also useful to verify user permissions
and query validity. You are not charged for performing the dry run.

Read more: https://cloud.google.com/bigquery/docs/dry-run-queries

E.g:
```python
dry_run(HeartRate)
```

Will result in:

```python
DryRunResult(
    total_bytes_processed=170181834,
    query="SELECT

health.creationDate, health.device, health.endDate, health.id, health.sourceName, health.startDate, health.unit, health.value FROM diogo.health WHERE health.type = 'HeartRate'", model=HeartRate, referenced_tables=["amora-data-build-tool.diogo.health"], schema=[ SchemaField("creationDate", "TIMESTAMP", "NULLABLE", None, (), None), SchemaField("device", "STRING", "NULLABLE", None, (), None), SchemaField("endDate", "TIMESTAMP", "NULLABLE", None, (), None), SchemaField("id", "INTEGER", "NULLABLE", None, (), None), SchemaField("sourceName", "STRING", "NULLABLE", None, (), None), SchemaField("startDate", "TIMESTAMP", "NULLABLE", None, (), None), SchemaField("unit", "STRING", "NULLABLE", None, (), None), SchemaField("value", "FLOAT", "NULLABLE", None, (), None), ], ) ```

Source code in amora/providers/bigquery.py
@log_execution()
def dry_run(model: Model) -> Optional[DryRunResult]:
    """
    You can use the estimate returned by the dry run to calculate query
    costs in the pricing calculator. Also useful to verify user permissions
    and query validity. You are not charged for performing the dry run.

    Read more: https://cloud.google.com/bigquery/docs/dry-run-queries

    E.g:
    ```python
    dry_run(HeartRate)
    ```

    Will result in:

    ```python
    DryRunResult(
        total_bytes_processed=170181834,
        query="SELECT\n  `health`.`creationDate`,\n  `health`.`device`,\n  `health`.`endDate`,\n  `health`.`id`,\n  `health`.`sourceName`,\n  `health`.`startDate`,\n  `health`.`unit`,\n  `health`.`value`\nFROM `diogo`.`health`\nWHERE `health`.`type` = 'HeartRate'",
        model=HeartRate,
        referenced_tables=["amora-data-build-tool.diogo.health"],
        schema=[
            SchemaField("creationDate", "TIMESTAMP", "NULLABLE", None, (), None),
            SchemaField("device", "STRING", "NULLABLE", None, (), None),
            SchemaField("endDate", "TIMESTAMP", "NULLABLE", None, (), None),
            SchemaField("id", "INTEGER", "NULLABLE", None, (), None),
            SchemaField("sourceName", "STRING", "NULLABLE", None, (), None),
            SchemaField("startDate", "TIMESTAMP", "NULLABLE", None, (), None),
            SchemaField("unit", "STRING", "NULLABLE", None, (), None),
            SchemaField("value", "FLOAT", "NULLABLE", None, (), None),
        ],
    )
    ```
    """
    client = get_client()
    source = model.source()
    if source is None:
        table = client.get_table(model.fully_qualified_name())

        if table.table_type == "VIEW":
            query_job = client.query(
                query=table.view_query,
                job_config=QueryJobConfig(dry_run=True, use_query_cache=False),
            )
            return DryRunResult(
                job_id=query_job.job_id,
                model=model,
                query=table.view_query,
                referenced_tables=[
                    ".".join(table.to_api_repr().values())
                    for table in query_job.referenced_tables
                ],
                schema=query_job.schema,
                total_bytes=query_job.total_bytes_processed,
                user_email=query_job.user_email,
            )

        return DryRunResult(
            job_id=None,
            model=model,
            query=None,
            referenced_tables=[str(table.reference)],
            schema=table.schema,
            total_bytes=table.num_bytes,
            user_email=None,
        )

    query = compile_statement(source)
    try:
        query_job = client.query(
            query=query,
            job_config=QueryJobConfig(dry_run=True, use_query_cache=False),
        )
    except NotFound:
        logger.exception(
            "The query may contain model references that are not materialized.",
            extra={"sql": query},
        )
        return None
    else:
        return DryRunResult(
            job_id=query_job.job_id,
            total_bytes=query_job.total_bytes_processed,
            referenced_tables=[
                ".".join(table.to_api_repr().values())
                for table in query_job.referenced_tables
            ],
            query=query,
            model=model,
            schema=query_job.schema,
            user_email=query_job.user_email,
        )

Building a CTE from literal values

Returns a table like selectable (CTE) for the given hardcoded values.

E.g:

rows = [{"numeric_column": "123"}, {"numeric_column": "234"}, {"numeric_column": "345"}]
cte_from_rows(rows)

Will result in the following SQL

    WITH `annon_1` AS (
        SELECT "123" AS numeric_column
        UNION ALL SELECT "234 AS numeric_column
        UNION ALL SELECT "345" AS numeric_column
    )

Which would render a table like:

| numeric_column |
|----------------|
| 123            |
| 234            |
| 345            |

Useful both for model writing and testing purposes. Think of cte_from_rows as way of generating a "temporary table like object", with data available at runtime.

Source code in amora/providers/bigquery.py
def cte_from_rows(rows: Iterable[Dict[Hashable, Any]]) -> CTE:
    """
    Returns a table like selectable (CTE) for the given hardcoded values.

    E.g:
    ```python
    rows = [{"numeric_column": "123"}, {"numeric_column": "234"}, {"numeric_column": "345"}]
    cte_from_rows(rows)
    ```

    Will result in the following SQL

    ```sql
        WITH `annon_1` AS (
            SELECT "123" AS numeric_column
            UNION ALL SELECT "234 AS numeric_column
            UNION ALL SELECT "345" AS numeric_column
        )
    ```

    Which would render a table like:

    ```md
    | numeric_column |
    |----------------|
    | 123            |
    | 234            |
    | 345            |
    ```

    Useful both for model writing and testing purposes. Think of `cte_from_rows` as way of generating
    a "temporary table like object", with data available at runtime.

    """

    def gen_selects(rows):
        for row in rows:
            cols = []
            for name, value in row.items():
                if isinstance(value, array):
                    cols.append(value.label(name))
                elif isinstance(value, AmoraModel):
                    cols.append(struct(value).label(name))
                else:
                    cols.append(literal(value).label(name))
            yield select(*cols)

    selects = list(gen_selects(rows))

    if len(selects) == 1:
        return selects[0].cte()

    return union_all(*selects).cte()

Cost estimation

By default, queries are billed using the on-demand pricing model, where you pay for the data scanned by your queries.

  • This function doesn't take into consideration that the first 1 TB per month is free.
  • By default, the estimation is based on BigQuery's On-demand analysis pricing, which may change over time and may vary according to regions and your personal contract with GCP.

You may set AMORA_GCP_BIGQUERY_ON_DEMAND_COST_PER_TERABYTE_IN_USD to the appropriate value for your use case.

More on: https://cloud.google.com/bigquery/pricing#analysis_pricing_models

:param total_bytes: Total data processed by the query :return: The estimated cost in USD, based on On-demand price

Source code in amora/providers/bigquery.py
def estimated_query_cost_in_usd(total_bytes: int) -> float:
    """
    By default, queries are billed using the on-demand pricing model,
    where you pay for the data scanned by your queries.

    - This function doesn't take into consideration that the first 1 TB per month is free.
    - By default, the estimation is based on BigQuery's `On-demand analysis` pricing, which may change over time and
    may vary according to regions and your personal contract with GCP.

    You may set `AMORA_GCP_BIGQUERY_ON_DEMAND_COST_PER_TERABYTE_IN_USD` to the appropriate value for your use case.

    More on: https://cloud.google.com/bigquery/pricing#analysis_pricing_models

    :param total_bytes: Total data processed by the query
    :return: The estimated cost in USD, based on `On-demand` price
    """
    total_terabytes = total_bytes / 1024**4
    return total_terabytes * settings.GCP_BIGQUERY_ON_DEMAND_COST_PER_TERABYTE_IN_USD

Storage pricing is the cost to store data that you load into BigQuery. Active storage includes any table or table partition that has been modified in the last 90 days.

  • This function doesn't take into consideration that the first 10 GB of storage per month is free.
  • By default, the estimation is based on BigQuery's Active Storage cost per GB, which may change over time and may vary according to regions and your personal contract with GCP.

You may set AMORA_GCP_BIGQUERY_ACTIVE_STORAGE_COST_PER_GIGABYTE_IN_USD to the appropriate value for your use case.

More on: https://cloud.google.com/bigquery/pricing#storage

:param total_bytes: Total bytes stored into the table :return: The estimated cost in USD, based on Active storage price

Source code in amora/providers/bigquery.py
def estimated_storage_cost_in_usd(total_bytes: int) -> float:
    """
    Storage pricing is the cost to store data that you load into BigQuery.
    `Active storage` includes any table or table partition that has been modified in the last 90 days.

    - This function doesn't take into consideration that the first 10 GB of storage per month is free.
    - By default, the estimation is based on BigQuery's `Active Storage` cost per GB, which may change over time and
    may vary according to regions and your personal contract with GCP.

    You may set `AMORA_GCP_BIGQUERY_ACTIVE_STORAGE_COST_PER_GIGABYTE_IN_USD` to the appropriate value for your use case.

    More on: https://cloud.google.com/bigquery/pricing#storage

    :param total_bytes: Total bytes stored into the table
    :return: The estimated cost in USD, based on `Active storage` price
    """
    total_gigabytes = total_bytes / 1024**3
    return (
        total_gigabytes * settings.GCP_BIGQUERY_ACTIVE_STORAGE_COST_PER_GIGABYTE_IN_USD
    )

Data schema

Given a table_id, returns the Schema of the table by querying BigQueries API

Source code in amora/providers/bigquery.py
def get_schema(table_id: str) -> Schema:
    """
    Given a `table_id`, returns the `Schema` of the table by querying BigQueries API
    """
    client = get_client()
    table = client.get_table(table_id)
    return table.schema

Given an AmoraModel, returns the equivalent bigquery Schema of the model by parsing the model SQLAlchemy column schema

Source code in amora/providers/bigquery.py
def schema_for_model(model: Model) -> Schema:
    """
    Given an `AmoraModel`, returns the equivalent bigquery `Schema`
    of the model by parsing the model SQLAlchemy column schema
    """
    columns = model.__table__.columns
    return [schema_field_for_column(col) for col in columns]

Give an Amora Model, returns the bigquery Schema of its source classmethod query result

Source code in amora/providers/bigquery.py
def schema_for_model_source(model: Model) -> Optional[Schema]:
    """
    Give an `Amora Model`, returns the bigquery `Schema` of its
    `source` classmethod query result
    """
    source = model.source()
    if source is None:
        return None

    result = dry_run(model)
    if result is None:
        return None

    return result.schema
Source code in amora/providers/bigquery.py
def schema_for_struct(struct: STRUCT) -> Schema:
    return [
        SchemaField(
            name=name,
            field_type=SQLALCHEMY_TYPES_TO_BIGQUERY_TYPES[sqla_type.__class__],
        )
        for name, sqla_type in struct._STRUCT_fields
    ]

Build a BigQuery Struct type from a google.cloud.bigquery.schema.SchemaField

Read more: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#struct_type

Source code in amora/providers/bigquery.py
def struct_for_schema_field(schema_field: SchemaField) -> STRUCT:
    """
    Build a BigQuery Struct type from a `google.cloud.bigquery.schema.SchemaField`

    Read more: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#struct_type
    """

    def fields():
        for field in schema_field.fields:
            if field.mode == "REPEATED":
                if field.field_type == "RECORD":
                    yield field.name, ARRAY(struct_for_schema_field(field))
                else:
                    sqla_type = BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES[field.field_type]
                    yield field.name, ARRAY(sqla_type)
            else:
                if field.field_type == "RECORD":
                    yield field.name, struct_for_schema_field(field)
                else:
                    sqla_type = BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES[field.field_type]
                    yield field.name, sqla_type

    return STRUCT(*fields())

Build a Column from a google.cloud.bigquery.schema.SchemaField

Read more: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#tablefieldschema

Source code in amora/providers/bigquery.py
def column_for_schema_field(schema: SchemaField, **kwargs) -> dataclasses.Field:
    """
    Build a `Column` from a `google.cloud.bigquery.schema.SchemaField`

    Read more: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#tablefieldschema
    """
    if schema.mode == "REPEATED":
        if schema.field_type == "RECORD":
            column_type = ARRAY(struct_for_schema_field(schema))
        else:
            column_type = ARRAY(BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES[schema.field_type])
    else:
        if schema.field_type == "RECORD":
            column_type = struct_for_schema_field(schema)
        else:
            column_type = BIGQUERY_TYPES_TO_SQLALCHEMY_TYPES[schema.field_type]

    return Field(column_type, **kwargs)
Source code in amora/providers/bigquery.py
def schema_field_for_column(column: Column) -> SchemaField:
    fields: Iterable[SchemaField] = ()

    if isinstance(column.type, ARRAY):
        mode = "REPEATED"
        item_type = column.type.item_type
        field_type = SQLALCHEMY_TYPES_TO_BIGQUERY_TYPES[column.type.item_type.__class__]

        if isinstance(item_type, STRUCT):
            fields = tuple(schema_for_struct(item_type))
    else:
        mode = "NULLABLE"
        column_type = column.type
        field_type = SQLALCHEMY_TYPES_TO_BIGQUERY_TYPES[column_type.__class__]

        if isinstance(column_type, STRUCT):
            fields = tuple(schema_for_struct(column_type))

    return SchemaField(
        name=column.name,
        field_type=field_type,
        mode=mode,
        fields=fields or (),
        description=column.doc or _DEFAULT_VALUE,
    )

Data Sample

Given a model, returns a random sample of the data.

Read more: https://cloud.google.com/bigquery/docs/table-sampling

Parameters:

Name Type Description Default
model Type[AmoraModel]

AmoraModel to extract the extract the sample

required
percentage int

The percentage of the sample. E.g: percentage=10 is 10% of the data.

1
limit int

The maximum number of rows to be returned

1000

Returns:

Type Description
DataFrame

The sample data as a pandas.DataFrame

Exceptions:

Type Description
ValueError

TABLESAMPLE SYSTEM can only be applied directly to tables.

Source code in amora/providers/bigquery.py
@cache(_sample_cache_key)
def sample(
    model: Model,
    percentage: int = 1,
    limit: int = settings.GCP_BIGQUERY_DEFAULT_LIMIT_SIZE,
) -> pd.DataFrame:
    """
    Given a model, returns a random sample of the data.

    Read more: [https://cloud.google.com/bigquery/docs/table-sampling](https://cloud.google.com/bigquery/docs/table-sampling)



    Args:
        model: AmoraModel to extract the extract the sample
        percentage: The percentage of the sample. E.g: percentage=10 is 10% of the data.
        limit: The maximum number of rows to be returned
    Returns:
        The sample data as a `pandas.DataFrame`
    Raises:
        ValueError: TABLESAMPLE SYSTEM can only be applied directly to tables.
    """
    if model.__model_config__.materialized is not MaterializationTypes.table:
        raise ValueError(
            "TABLESAMPLE SYSTEM can only be applied directly to tables. "
            "More on: https://cloud.google.com/bigquery/docs/table-sampling#limitations"
        )

    sampling = literal_column(f"{percentage} PERCENT")
    model_sample = tablesample(model, sampling)  # type: ignore
    stmt = select(model_sample).limit(limit)

    logger.debug(f"Sampling model `{model.unique_name()}`")
    return run(stmt).to_dataframe()

Repeated Fields

Read more: https://cloud.google.com/bigquery/docs/nested-repeated

A BigQuery ARRAY literal.

This is used to produce ARRAY literals in SQL expressions, e.g.:

from sqlalchemy import select

from amora.compilation import compile_statement
from amora.providers.bigquery import array

stmt = select([array([1, 2]).label("a"), array([3, 4, 5]).label("b")])

compile_statement(stmt)
Produces the SQL:

SELECT
    ARRAY[1, 2] AS a,
    ARRAY[3, 4, 5]) AS b

An instance of array will always have the datatype sqlalchemy_bigquery.base.BQArray. The "inner" type of the array is inferred from the values present, unless the type_ keyword argument is passed, e.g.:

array(["foo", "bar"], type_=String)

Arrays can also be constructed using AmoraModel instances, which would compile into a array of structs. E.g:

class Point(AmoraModel):
    x: int
    y: int


array([Point(x=4, y=4), Point(x=2, y=2)])
Source code in amora/providers/bigquery.py
class array(expression.ClauseList, expression.ColumnElement):  # type: ignore
    """
    A BigQuery ARRAY literal.

    This is used to produce `ARRAY` literals in SQL expressions, e.g.:

    ```python
    from sqlalchemy import select

    from amora.compilation import compile_statement
    from amora.providers.bigquery import array

    stmt = select([array([1, 2]).label("a"), array([3, 4, 5]).label("b")])

    compile_statement(stmt)
    ```
    Produces the SQL:

    ```sql
    SELECT
        ARRAY[1, 2] AS a,
        ARRAY[3, 4, 5]) AS b
    ```

    An instance of `array` will always have the datatype `sqlalchemy_bigquery.base.BQArray`.
    The "inner" type of the array is inferred from the values present, unless the
    ``type_`` keyword argument is passed, e.g.:

    ```python
    array(["foo", "bar"], type_=String)
    ```

    Arrays can also be constructed using `AmoraModel` instances,
    which would compile into a array of structs. E.g:

    ```python
    class Point(AmoraModel):
        x: int
        y: int


    array([Point(x=4, y=4), Point(x=2, y=2)])
    ```
    """

    __visit_name__ = "array"

    def __init__(self, clauses, **kw):
        if clauses and isinstance(clauses[0], AmoraModel):
            clauses = [struct(c) for c in clauses]

        clauses = [coercions.expect(roles.ExpressionElementRole, c) for c in clauses]
        super().__init__(*clauses, **kw)
        self._type_tuple = [arg.type for arg in clauses]
        main_type = kw.pop(
            "type_",
            self._type_tuple[0] if self._type_tuple else sqltypes.NULLTYPE,
        )
        self.type = BQArray(main_type, dimensions=1)

        for type_ in self._type_tuple:
            if type(type_) is sqltypes.NullType:
                raise ValueError("Array cannot have a null element")

    def _bind_param(self, operator, obj, _assume_scalar=False, type_=None):
        if _assume_scalar or operator is operators.getitem:
            # if getitem->slice were called, Indexable produces
            # a Slice object from that
            assert isinstance(obj, int)
            return expression.BindParameter(
                None,
                obj,
                _compared_to_operator=operator,
                type_=type_,
                _compared_to_type=self.type,
                unique=True,
            )

        return array(
            [
                self._bind_param(operator, o, _assume_scalar=True, type_=type_)
                for o in obj
            ]
        )

    def self_group(self, against=None):
        if against in (operators.any_op, operators.all_op, operators.getitem):
            return expression.Grouping(self)

        return self

self_group(self, against=None)

Apply a 'grouping' to this :class:_expression.ClauseElement.

This method is overridden by subclasses to return a "grouping" construct, i.e. parenthesis. In particular it's used by "binary" expressions to provide a grouping around themselves when placed into a larger expression, as well as by :func:_expression.select constructs when placed into the FROM clause of another :func:_expression.select. (Note that subqueries should be normally created using the :meth:_expression.Select.alias method, as many platforms require nested SELECT statements to be named).

As expressions are composed together, the application of :meth:self_group is automatic - end-user code should never need to use this method directly. Note that SQLAlchemy's clause constructs take operator precedence into account - so parenthesis might not be needed, for example, in an expression like x OR (y AND z) - AND takes precedence over OR.

The base :meth:self_group method of :class:_expression.ClauseElement just returns self.

Source code in amora/providers/bigquery.py
def self_group(self, against=None):
    if against in (operators.any_op, operators.all_op, operators.getitem):
        return expression.Grouping(self)

    return self

Zipping arrays

Given at least two array columns of equal length, returns a table of the unnest values, converting array items into rows. E.g:

A CTE with 3 array columns: entity, f1, f2

from amora.providers.bigquery import array, cte_from_rows, zip_arrays

cte = cte_from_rows(
    [
        {
            "entity": array([1, 2]),
            "f1": array(["f1v1", "f1v2"]),
            "f2": array(["f2v1", "f2v2"]),
        }
    ]
)

zip_arrays(cte.c.entity, cte.c.f1, cte.c.f2)

Will result in the following table:

entity f1 f2
1 f1v1 f2v1
2 f1v2 f2v2

If additional columns are needed from the original data, those can be selected using the optional additional_columns:

from amora.providers.bigquery import array, cte_from_rows, zip_arrays

cte = cte_from_rows(
    [
        {
            "entity": array([1, 2]),
            "f1": array(["f1v1", "f1v2"]),
            "f2": array(["f2v1", "f2v2"]),
            "id": 1,
        },
        {
            "entity": array([3, 4]),
            "f1": array(["f1v3", "f1v4"]),
            "f2": array(["f2v3", "f2v4"]),
            "id": 2,
        },
    ]
)

zip_arrays(cte.c.entity, cte.c.f1, cte.c.f2, additional_columns=[cte.c.id])
entity f1 f2 id
1 f1v1 f2v1 1
2 f1v2 f2v2 1
3 f1v3 f2v3 2
4 f1v4 f2v4 2

Read more: https://cloud.google.com/bigquery/docs/reference/standard-sql/arrays#zipping_arrays

Parameters:

Name Type Description Default
*arr_columns Column

Array columns of equal length

()
additional_columns Optional[List[sqlalchemy.sql.schema.Column]]

Additional columns needed from the original data

None
Source code in amora/providers/bigquery.py
def zip_arrays(
    *arr_columns: Column, additional_columns: Optional[List[Column]] = None
) -> Compilable:
    """
    Given at least two array columns of equal length, returns a table of the unnest values,
    converting array items into rows. E.g:

    A CTE with 3 array columns: `entity`, `f1`, `f2`

    ```python
    from amora.providers.bigquery import array, cte_from_rows, zip_arrays

    cte = cte_from_rows(
        [
            {
                "entity": array([1, 2]),
                "f1": array(["f1v1", "f1v2"]),
                "f2": array(["f2v1", "f2v2"]),
            }
        ]
    )

    zip_arrays(cte.c.entity, cte.c.f1, cte.c.f2)
    ```

    Will result in the following table:

    | entity | f1   | f2   |
    |--------|------|------|
    | 1      | f1v1 | f2v1 |
    | 2      | f1v2 | f2v2 |

    If additional columns are needed from the original data, those can be selected
    using the optional `additional_columns`:

    ```python
    from amora.providers.bigquery import array, cte_from_rows, zip_arrays

    cte = cte_from_rows(
        [
            {
                "entity": array([1, 2]),
                "f1": array(["f1v1", "f1v2"]),
                "f2": array(["f2v1", "f2v2"]),
                "id": 1,
            },
            {
                "entity": array([3, 4]),
                "f1": array(["f1v3", "f1v4"]),
                "f2": array(["f2v3", "f2v4"]),
                "id": 2,
            },
        ]
    )

    zip_arrays(cte.c.entity, cte.c.f1, cte.c.f2, additional_columns=[cte.c.id])
    ```

    | entity | f1   | f2   | id   |
    |--------|------|------|------|
    | 1      | f1v1 | f2v1 | 1    |
    | 2      | f1v2 | f2v2 | 1    |
    | 3      | f1v3 | f2v3 | 2    |
    | 4      | f1v4 | f2v4 | 2    |

    Read more: [https://cloud.google.com/bigquery/docs/reference/standard-sql/arrays#zipping_arrays](https://cloud.google.com/bigquery/docs/reference/standard-sql/arrays#zipping_arrays)

    Args:
        *arr_columns: Array columns of equal length
        additional_columns: Additional columns needed from the original data
    """
    offset_alias = "off"
    offset = func.offset(literal_column(offset_alias))

    columns: List[ColumnElement] = [col[offset].label(col.key) for col in arr_columns]
    if additional_columns:
        columns += additional_columns

    return select(*columns).join(
        fixed_unnest(arr_columns[0]).table_valued(with_offset=offset_alias),
        onclause=literal(1) == literal(1),
        isouter=True,
    )

Structs and Nested Models

A BigQuery STRUCT/RECORD literal.

Experimental feature. You should probably use struct_for_model

Source code in amora/providers/bigquery.py
class struct(expression.ClauseList, expression.ColumnElement):  # type: ignore
    """
    A BigQuery STRUCT/RECORD literal.

    !!! warning "Experimental feature. You should probably use struct_for_model"
    """

    __visit_name__ = "struct"

    def __init__(self, model: AmoraModel, **kw):
        self._model = model
        self.type = struct_for_model(model)
        clauses = [
            coercions.expect(
                roles.ExpressionElementRole,
                dataclasses.asdict(model),  # type: ignore
                type_=self.type,
            )
        ]
        super().__init__(*clauses, **kw)

    def bind_expression(self, bindvalue):
        return bindvalue

    def self_group(self, against=None):
        if against in (operators.any_op, operators.all_op, operators.getitem):
            return expression.Grouping(self)

        return self

self_group(self, against=None)

Apply a 'grouping' to this :class:_expression.ClauseElement.

This method is overridden by subclasses to return a "grouping" construct, i.e. parenthesis. In particular it's used by "binary" expressions to provide a grouping around themselves when placed into a larger expression, as well as by :func:_expression.select constructs when placed into the FROM clause of another :func:_expression.select. (Note that subqueries should be normally created using the :meth:_expression.Select.alias method, as many platforms require nested SELECT statements to be named).

As expressions are composed together, the application of :meth:self_group is automatic - end-user code should never need to use this method directly. Note that SQLAlchemy's clause constructs take operator precedence into account - so parenthesis might not be needed, for example, in an expression like x OR (y AND z) - AND takes precedence over OR.

The base :meth:self_group method of :class:_expression.ClauseElement just returns self.

Source code in amora/providers/bigquery.py
def self_group(self, against=None):
    if against in (operators.any_op, operators.all_op, operators.getitem):
        return expression.Grouping(self)

    return self

Build a BigQuery Struct type from an AmoraModel specification

Read more: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#struct_type

Source code in amora/providers/bigquery.py
def struct_for_model(model: Union[Model, AmoraModel]) -> STRUCT:
    """
    Build a BigQuery Struct type from an AmoraModel specification

    Read more: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#struct_type
    """

    def fields():
        for field in dataclasses.fields(model):
            if field.type == list:
                if issubclass(field.type, AmoraModel):
                    yield field.name, ARRAY(
                        struct_for_model(field.metadata[SQLALCHEMY_METADATA_KEY].type)
                    )
                else:
                    yield field.name, ARRAY(
                        field.metadata[SQLALCHEMY_METADATA_KEY].type
                    )
            elif field.type == dict:
                if issubclass(field.type, AmoraModel):
                    yield field.name, struct_for_model(
                        field.metadata[SQLALCHEMY_METADATA_KEY].type
                    )
                else:
                    yield field.name, field.metadata[SQLALCHEMY_METADATA_KEY].type
            else:
                yield field.name, field.metadata[SQLALCHEMY_METADATA_KEY].type

    return STRUCT(*fields())

Last update: 2023-11-23