Skip to content

Base

blackline.adapters.base

AdapterBase

Bases: ABC

Source code in BAR /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
class AdapterBase(ABC):
    date_format = "%Y-%m-%d"

    def __init__(self, config, condition: Optional[str] = None) -> None:
        # def __init__(self, config: DataStoreBase) -> None:
        self.config = config
        self.condition = condition

    @abstractmethod
    def connection(self) -> Any:
        """
        Override this method to return a context manager that is
        connection object.
        """
        pass

    @abstractmethod
    def execute(self, sql: str, values: Optional[Dict[str, Any]] = None) -> Any:
        """
        Override this method to execute a query. It should run a self.connection()
        as a context manager and return the results of the query.

        Args:
            sql: The SQL query string to execute.
            values : The values to pass into the sql query if query parameters are used.
            Defaults to None.

        Returns:
            Any: A Cursor with an executed query.
        """
        with self.connection() as conn:
            with conn.cursor() as cur:
                return cur.execute(sql, values)

    @abstractmethod
    def test_connection(self) -> bool:
        pass

    @abstractmethod
    def update_template(self) -> str:
        pass

    @abstractmethod
    def set_template(self) -> str:
        pass

    @abstractmethod
    def redact_template(self) -> str:
        pass

    @abstractmethod
    def replace_template(self) -> str:
        pass

    @abstractmethod
    def mask_template(self) -> str:
        pass

    @abstractmethod
    def where_template(self) -> str:
        pass

    # @abstractmethod
    def validate_dataset(self, dataset: Dataset) -> DatasetCollectionValidation:
        return DatasetCollectionValidation()

condition = condition instance-attribute

config = config instance-attribute

date_format = '%Y-%m-%d' class-attribute instance-attribute

__init__(config, condition=None)

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
def __init__(self, config, condition: Optional[str] = None) -> None:
    # def __init__(self, config: DataStoreBase) -> None:
    self.config = config
    self.condition = condition

connection() abstractmethod

Override this method to return a context manager that is connection object.

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def connection(self) -> Any:
    """
    Override this method to return a context manager that is
    connection object.
    """
    pass

execute(sql, values=None) abstractmethod

Override this method to execute a query. It should run a self.connection() as a context manager and return the results of the query.

Parameters:

Name Type Description Default
sql str

The SQL query string to execute.

required
values

The values to pass into the sql query if query parameters are used.

None

Returns:

Name Type Description
Any Any

A Cursor with an executed query.

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def execute(self, sql: str, values: Optional[Dict[str, Any]] = None) -> Any:
    """
    Override this method to execute a query. It should run a self.connection()
    as a context manager and return the results of the query.

    Args:
        sql: The SQL query string to execute.
        values : The values to pass into the sql query if query parameters are used.
        Defaults to None.

    Returns:
        Any: A Cursor with an executed query.
    """
    with self.connection() as conn:
        with conn.cursor() as cur:
            return cur.execute(sql, values)

mask_template() abstractmethod

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def mask_template(self) -> str:
    pass

redact_template() abstractmethod

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def redact_template(self) -> str:
    pass

replace_template() abstractmethod

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def replace_template(self) -> str:
    pass

set_template() abstractmethod

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def set_template(self) -> str:
    pass

test_connection() abstractmethod

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def test_connection(self) -> bool:
    pass

update_template() abstractmethod

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def update_template(self) -> str:
    pass

validate_dataset(dataset)

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
def validate_dataset(self, dataset: Dataset) -> DatasetCollectionValidation:
    return DatasetCollectionValidation()

where_template() abstractmethod

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/blackline/adapters/base.py
@abstractmethod
def where_template(self) -> str:
    pass