"""SWF workflow execution management."""
import abc
import enum
import datetime
import warnings
import functools
import dataclasses
import typing as t
from . import _common
if t.TYPE_CHECKING:
import botocore.client
from . import _workflows
default_executions_list_time_range = datetime.timedelta(days=90)
[docs]
@dataclasses.dataclass
class CurrentExecutionId(_common.Deserialisable, _common.Serialisable):
"""Current open workflow execution specifier."""
id: str
"""Execution workflow-ID."""
[docs]
@classmethod
def from_api(cls, data) -> "CurrentExecutionId":
return cls(id=data["workflowId"])
[docs]
def to_api(self):
return {"workflowId": self.id}
[docs]
@dataclasses.dataclass
class ExecutionId(CurrentExecutionId):
"""Workflow execution identifier."""
run_id: str
"""Execution run-ID."""
[docs]
@classmethod
def from_api(cls, data) -> "ExecutionId":
return cls(id=data["workflowId"], run_id=data["runId"])
[docs]
def to_api(self):
data = super().to_api()
data["runId"] = self.run_id
return data
[docs]
class ExecutionStatus(enum.Enum):
"""Workflow execution status."""
open = "OPEN"
"""Execution is in-progress."""
started = "OPEN"
"""Execution is in-progress."""
completed = "COMPLETED"
"""Execution has finished successfully."""
failed = "FAILED"
"""Execution has failed."""
cancelled = "CANCELED"
"""Execution has been cancelled."""
terminated = "TERMINATED"
"""Execution has been terminated."""
continued_as_new = "CONTINUED_AS_NEW"
"""Execution has been continued as a new execution."""
timed_out = "TIMED_OUT"
"""Execution has timed out."""
[docs]
@dataclasses.dataclass
class ExecutionInfo(_common.Deserialisable):
"""Workflow execution details."""
execution: ExecutionId
"""Execution ID."""
workflow: "_workflows.WorkflowId"
"""Execution workflow."""
started: datetime.datetime
"""Execution start-date."""
status: ExecutionStatus
"""Execution status."""
cancel_requested: bool
"""Execution cancellation has been requested."""
closed: datetime.datetime = None
"""Execution end-date."""
parent: ExecutionId = None
"""Parent execution ID."""
tags: t.List[str] = None
"""Execution tags."""
[docs]
@classmethod
def from_api(cls, data) -> "ExecutionInfo":
from . import _workflows
status_data = data["executionStatus"]
if status_data == "CLOSED":
status_data = data["closeStatus"]
return cls(
execution=ExecutionId.from_api(data["execution"]),
workflow=_workflows.WorkflowId.from_api(data["workflowType"]),
started=data["startTimestamp"],
status=ExecutionStatus(status_data),
cancel_requested=data["cancelRequested"],
closed=data.get("closeTimestamp"),
parent=data.get("parent") and ExecutionId.from_api(data["parent"]),
tags=data.get("tagList"),
)
[docs]
class ChildExecutionTerminationPolicy(str, enum.Enum):
"""Child workflow execution ending policy on parent termination."""
terminate = "TERMINATE"
"""Terminate child executions."""
request_cancel = "REQUEST_CANCEL"
"""Request for child execution cancellation."""
abandon = "ABANDON"
"""Abandon child executions."""
[docs]
@dataclasses.dataclass
class ExecutionConfiguration(_common.Deserialisable):
"""Workflow execution configuration."""
timeout: t.Union[datetime.timedelta, None]
"""Execution run-time timeout."""
decision_task_timeout: t.Union[datetime.timedelta, None]
"""Decision task timeout."""
decision_task_list: str
"""Decision task task-list."""
child_execution_policy_on_termination: ChildExecutionTerminationPolicy
"""Child workflow execution ending policy on termination."""
decision_task_priority: int = None
"""Decision task priority."""
lambda_iam_role_arn: str = None
"""Execution IAM role ARN for Lambda invocations."""
[docs]
@classmethod
def from_api(cls, data) -> "ExecutionConfiguration":
child_policy = ChildExecutionTerminationPolicy(data["childPolicy"])
decision_task_timeout = _common.parse_timeout(data["taskStartToCloseTimeout"])
return cls(
timeout=_common.parse_timeout(data["executionStartToCloseTimeout"]),
decision_task_timeout=decision_task_timeout,
decision_task_list=data["taskList"]["name"],
decision_task_priority=(
data.get("taskPriority") and int(data["taskPriority"])
),
child_execution_policy_on_termination=child_policy,
lambda_iam_role_arn=data.get("lambdaRole"),
)
[docs]
@dataclasses.dataclass
class PartialExecutionConfiguration(
ExecutionConfiguration, _common.SerialisableToArguments
):
"""Partial workflow execution configuration."""
timeout: t.Union[datetime.timedelta, None] = _common.unset
decision_task_timeout: t.Union[datetime.timedelta, None] = _common.unset
decision_task_list: str = None
decision_task_priority: int = None
child_execution_policy_on_termination: ChildExecutionTerminationPolicy = None
[docs]
@classmethod
def from_api(cls, data) -> "PartialExecutionConfiguration":
return cls(
timeout=(
data.get("executionStartToCloseTimeout") and
_common.parse_timeout(data["executionStartToCloseTimeout"])
),
decision_task_timeout=(
data.get("taskStartToCloseTimeout") and
_common.parse_timeout(data["taskStartToCloseTimeout"])
),
decision_task_list=data.get("taskList") and data["taskList"]["name"],
decision_task_priority=(
data.get("taskPriority") and int(data["taskPriority"])
),
child_execution_policy_on_termination=(
data.get("childPolicy") and
ChildExecutionTerminationPolicy(data["childPolicy"])
),
lambda_iam_role_arn=data.get("lambdaRole"),
)
[docs]
def get_api_args(self):
data = {}
if self.timeout or self.timeout == datetime.timedelta(0):
data["executionStartToCloseTimeout"] = str(
int(self.timeout.total_seconds())
)
elif self.timeout is None:
data["executionStartToCloseTimeout"] = "NONE"
decision_task_timeout = self.decision_task_timeout
if decision_task_timeout or decision_task_timeout == datetime.timedelta(0):
data["taskStartToCloseTimeout"] = str(
int(decision_task_timeout.total_seconds())
)
elif decision_task_timeout is None:
data["taskStartToCloseTimeout"] = "NONE"
if self.decision_task_list or self.decision_task_list == "":
data["taskList"] = {"name": self.decision_task_list}
if self.decision_task_priority or self.decision_task_priority == 0:
data["taskPriority"] = str(self.decision_task_priority)
if self.child_execution_policy_on_termination:
data["childPolicy"] = self.child_execution_policy_on_termination.value
if self.lambda_iam_role_arn or self.lambda_iam_role_arn == "":
data["lambdaRole"] = self.lambda_iam_role_arn
return data
[docs]
@dataclasses.dataclass
class ExecutionOpenCounts:
"""Counts of workflow executions' open tasks/timers/children."""
activity_tasks: int
"""Number of scheduled/started activity tasks."""
decision_tasks: int
"""Number of scheduled/started decision tasks."""
timers: int
"""Number of started timers."""
child_executions: int
"""Number of started child executions."""
lambda_tasks: int = None
"""Number of scheduled/started Lambda invocations."""
[docs]
@classmethod
def from_api(cls, data) -> "ExecutionOpenCounts":
return cls(
data["openActivityTasks"],
data["openDecisionTasks"],
data["openTimers"],
data["openChildWorkflowExecutions"],
lambda_tasks=data.get("openLambdaFunctions"),
)
[docs]
@dataclasses.dataclass
class ExecutionDetails:
"""Workflow execution details, configuration, open-counts and snapshot."""
info: ExecutionInfo
"""Execution details."""
configuration: ExecutionConfiguration = None
"""Execution configuration."""
n_open: ExecutionOpenCounts = None
"""Counts of open tasks/timers/children in execution."""
latest_activity_task_scheduled: datetime.datetime = None
"""Most recent activity task's scheduled's date."""
latest_context: str = None
"""Most recent decision's execution context."""
[docs]
@classmethod
def from_api(cls, data) -> "ExecutionDetails":
config = ExecutionConfiguration.from_api(data["executionConfiguration"])
return cls(
info=ExecutionInfo.from_api(data["executionInfo"]),
configuration=config,
n_open=ExecutionOpenCounts.from_api(data["openCounts"]),
latest_activity_task_scheduled=data.get("latestActivityTaskTimestamp"),
latest_context=data.get("latestExecutionContext"),
)
[docs]
@dataclasses.dataclass
class ExecutionFilter(_common.SerialisableToArguments, metaclass=abc.ABCMeta):
"""Workflow execution filter."""
[docs]
@abc.abstractmethod
def get_api_args(self):
pass
@dataclasses.dataclass
class DateTimeFilter(_common.Serialisable):
"""Date-time property filter mix-in."""
earliest: datetime.datetime
"""Earliest date."""
latest: datetime.datetime = None
"""Latest date."""
def to_api(self):
data = {"oldestDate": self.earliest}
if self.latest:
data["latestDate"] = self.latest
return data
[docs]
@dataclasses.dataclass
class StartTimeExecutionFilter(DateTimeFilter, ExecutionFilter):
"""Workflow execution filter on start-time."""
[docs]
def get_api_args(self):
return {"startTimeFilter": self.to_api()}
[docs]
@dataclasses.dataclass
class CloseTimeExecutionFilter(DateTimeFilter, ExecutionFilter):
"""Workflow execution filter on close-time."""
[docs]
def get_api_args(self):
return {"closeTimeFilter": self.to_api()}
[docs]
@dataclasses.dataclass
class IdExecutionFilter(ExecutionFilter):
"""Workflow execution filter on execution workflow-ID."""
execution: CurrentExecutionId
"""Execution ID."""
[docs]
def get_api_args(self):
return {"executionFilter": self.execution.to_api()}
[docs]
@dataclasses.dataclass
class WorkflowTypeExecutionFilter(ExecutionFilter):
"""Workflow execution filter on execution workflow-type."""
workflow: t.Union["_workflows.WorkflowId", "_workflows.WorkflowIdFilter"]
"""Execution workflow."""
[docs]
def get_api_args(self):
return {"typeFilter": self.workflow.to_api()}
[docs]
@dataclasses.dataclass
class TagExecutionFilter(ExecutionFilter):
"""Workflow execution filter on execution tags."""
tag: str
"""Execution tag."""
[docs]
def get_api_args(self):
return {"tagFilter": {"tag": self.tag}}
[docs]
@dataclasses.dataclass
class CloseStatusExecutionFilter(ExecutionFilter):
"""Workflow execution filter on execution close-status."""
status: str
"""Execution status."""
[docs]
def get_api_args(self):
return {"closeStatusFilter": {"status": self.status}}
TimeFilter = t.TypeVar("TimeFilter", StartTimeExecutionFilter, CloseTimeExecutionFilter)
def _default_time_filter(cls: t.Type[TimeFilter]) -> TimeFilter:
"""Construct a default execution time-filter."""
now = datetime.datetime.now(tz=datetime.timezone.utc)
return cls(earliest=now - default_executions_list_time_range)
def _get_number_of_executions(
time_filter: t.Union[StartTimeExecutionFilter, CloseTimeExecutionFilter],
domain: str,
property_filter: t.Union[
IdExecutionFilter,
WorkflowTypeExecutionFilter,
TagExecutionFilter,
CloseStatusExecutionFilter,
None,
],
client_method: t.Callable[..., t.Dict[str, t.Any]],
) -> int:
"""Get the number of executions matching filter."""
kw = time_filter.get_api_args()
if property_filter:
kw.update(property_filter.get_api_args())
response = client_method(domain=domain, **kw)
if response["truncated"]:
warnings.warn("Actual execution count greater than returned amount")
return response["count"]
[docs]
def get_number_of_closed_executions(
domain: str,
time_filter: t.Union[StartTimeExecutionFilter, CloseTimeExecutionFilter] = None,
property_filter: t.Union[
IdExecutionFilter,
WorkflowTypeExecutionFilter,
TagExecutionFilter,
CloseStatusExecutionFilter,
] = None,
client: "botocore.client.BaseClient" = None,
) -> int:
"""Get the number of closed workflow executions.
Warns if the number of matching executions is greater than what's
returned.
Args:
domain: domain of executions
time_filter: execution start-time/close-time filter, default:
executions closed less than 90 days ago
property_filter: execution
workflow-ID/workflow-type/tags/close-status filter
client: SWF client
Returns:
number of matching workflow executions
"""
client = _common.ensure_client(client)
time_filter = time_filter or _default_time_filter(CloseTimeExecutionFilter)
return _get_number_of_executions(
time_filter, domain, property_filter, client.count_closed_workflow_executions
)
[docs]
def get_number_of_open_executions(
domain: str,
started_filter: StartTimeExecutionFilter = None,
property_filter: t.Union[
IdExecutionFilter,
WorkflowTypeExecutionFilter,
TagExecutionFilter,
] = None,
client: "botocore.client.BaseClient" = None,
) -> int:
"""Get the number of open workflow executions.
Warns if the number of matching executions is greater than what's
returned.
Args:
domain: domain of executions
started_filter: execution start-time filter, default: executions
opened less than 90 days ago
property_filter: execution workflow-ID/workflow-type/tags filter
client: SWF client
Returns:
number of matching workflow executions
"""
client = _common.ensure_client(client)
started_filter = started_filter or _default_time_filter(StartTimeExecutionFilter)
return _get_number_of_executions(
started_filter, domain, property_filter, client.count_open_workflow_executions
)
[docs]
def describe_execution(
execution: ExecutionId,
domain: str,
client: "botocore.client.BaseClient" = None,
) -> ExecutionDetails:
"""Describe a workflow execution.
Args:
execution: workflow execution to describe
domain: domain of workflow execution
client: SWF client
Returns:
workflow execution details, configuration, open-counts and snapshot
"""
client = _common.ensure_client(client)
response = client.describe_workflow_execution(
domain=domain, execution=execution.to_api()
)
return ExecutionDetails.from_api(response)
[docs]
def list_closed_executions(
domain: str,
time_filter: t.Union[StartTimeExecutionFilter, CloseTimeExecutionFilter] = None,
property_filter: t.Union[
IdExecutionFilter,
WorkflowTypeExecutionFilter,
TagExecutionFilter,
CloseStatusExecutionFilter,
] = None,
reverse: bool = False,
client: "botocore.client.BaseClient" = None,
) -> t.Generator[ExecutionInfo, None, None]:
"""List closed workflow executions; retrieved semi-lazily.
Args:
domain: domain of executions
time_filter: execution start-time/close-time filter, default:
executions closed less than 90 days ago
property_filter: execution
workflow-ID/workflow-type/tags/close-status filter
reverse: return results in reverse start/close order
client: SWF client
Returns:
matching workflow executions
"""
client = _common.ensure_client(client)
time_filter = time_filter or _default_time_filter(CloseTimeExecutionFilter)
kw = time_filter.get_api_args()
if property_filter:
kw.update(property_filter.get_api_args())
call = functools.partial(
client.list_closed_workflow_executions,
domain=domain,
reverseOrder=reverse,
**kw,
)
return _common.iter_paged(call, ExecutionInfo.from_api, "executionInfos")
[docs]
def list_open_executions(
domain: str,
started_filter: StartTimeExecutionFilter = None,
property_filter: t.Union[
IdExecutionFilter,
WorkflowTypeExecutionFilter,
TagExecutionFilter,
] = None,
reverse: bool = False,
client: "botocore.client.BaseClient" = None,
) -> t.Generator[ExecutionInfo, None, None]:
"""List open workflow executions; retrieved semi-lazily.
Args:
domain: domain of executions
started_filter: execution start-time filter, default: executions
opened less than 90 days ago
property_filter: execution workflow-ID/workflow-type/tags filter
reverse: return results in reverse start order
client: SWF client
Returns:
matching workflow executions
"""
client = _common.ensure_client(client)
started_filter = started_filter or _default_time_filter(StartTimeExecutionFilter)
kw = {}
if property_filter:
kw.update(property_filter.get_api_args())
call = functools.partial(
client.list_open_workflow_executions,
domain=domain,
startTimeFilter=started_filter.to_api(),
reverseOrder=reverse,
**kw,
)
return _common.iter_paged(call, ExecutionInfo.from_api, "executionInfos")
[docs]
def request_cancel_execution(
execution: t.Union[CurrentExecutionId, ExecutionId],
domain: str,
client: "botocore.client.BaseClient" = None,
) -> None:
"""Request the cancellation of a workflow execution.
Args:
execution: execution to cancel
domain: domain of execution
client: SWF client
"""
client = _common.ensure_client(client)
kw = {}
if isinstance(execution, ExecutionId):
kw["runId"] = execution.run_id
client.request_cancel_workflow_execution(
domain=domain, workflowId=execution.id, **kw
)
[docs]
def signal_execution(
execution: t.Union[CurrentExecutionId, ExecutionId],
signal: str,
domain: str,
input_: str = None,
client: "botocore.client.BaseClient" = None,
) -> None:
"""Send a signal to a workflow execution.
Args:
execution: execution to signal
signal: signal name
domain: domain of execution
input_: attached signal data
client: SWF client
"""
client = _common.ensure_client(client)
kw = {}
if isinstance(execution, ExecutionId):
kw["runId"] = execution.run_id
if input_ or input_ == "":
kw["input"] = input_
client.request_cancel_workflow_execution(
domain=domain,
workflowId=execution.id,
signalName=signal,
**kw,
)
[docs]
def start_execution(
workflow: "_workflows.WorkflowId",
execution: CurrentExecutionId,
domain: str,
input: str = None,
configuration: PartialExecutionConfiguration = None,
tags: t.List[str] = None,
client: "botocore.client.BaseClient" = None,
) -> ExecutionId:
"""Start a workflow execution.
Args:
workflow: workflow type for execution
execution: execution workflow-ID
domain: domain for execution
input: execution input
configuration: execution configuration, default: use defaults for
workflow type
tags: execution tags
client: SWF client
Returns:
workflow execution, with run-ID
"""
client = _common.ensure_client(client)
configuration = configuration or PartialExecutionConfiguration()
kw = configuration.get_api_args()
if input or input == "":
kw["input"] = input
if tags or tags == []:
kw["tagList"] = tags
response = client.start_workflow_execution(
domain=domain,
workflowId=execution.id,
workflowType=workflow.to_api(),
**kw,
)
return ExecutionId(id=execution.id, run_id=response["runId"])
[docs]
def terminate_execution(
execution: t.Union[CurrentExecutionId, ExecutionId],
domain: str,
reason: str = None,
details: str = None,
child_execution_policy: ChildExecutionTerminationPolicy = None,
client: "botocore.client.BaseClient" = None,
) -> None:
"""Terminate (immediately close) a workflow execution.
Args:
execution: workflow execution to close
domain: domain od execution
reason: termination reason, usually for classification
details: termination details, usually for explanation
child_execution_policy: how to handle open child workflow
executions, default: use default for workflow type
client: SWF client
"""
client = _common.ensure_client(client)
kw = {}
if isinstance(execution, ExecutionId):
kw["runId"] = execution.run_id
if reason or reason == "":
kw["reason"] = reason
if details or details == "":
kw["details"] = details
if child_execution_policy:
kw["childPolicy"] = child_execution_policy.value
client.terminate_workflow_execution(
domain=domain,
workflowId=execution.id,
**kw,
)