"""SWF activity task management."""
import datetime
import warnings
import dataclasses
import typing as t
from . import _common
if t.TYPE_CHECKING:
import botocore.client
from . import _activities
from . import _executions
[docs]
class Cancelled(Exception):
"""Current activity task has been cancelled."""
[docs]
@dataclasses.dataclass
class TaskConfiguration(_common.Deserialisable):
"""Activity task configuration."""
task_list: str
"""Task task-list."""
runtime_timeout: t.Union[datetime.timedelta, None]
"""Finish timeout."""
schedule_timeout: t.Union[datetime.timedelta, None]
"""Start timeout."""
total_timeout: t.Union[datetime.timedelta, None]
"""Total finish timeout."""
heartbeat_timeout: t.Union[datetime.timedelta, None] = _common.unset
"""Heartbeat timeout."""
priority: int = None
"""Task priority."""
[docs]
@classmethod
def from_api(cls, data) -> "TaskConfiguration":
return cls(
task_list=data["taskList"]["name"],
runtime_timeout=_common.parse_timeout(data["taskStartToCloseTimeout"]),
schedule_timeout=_common.parse_timeout(data["taskScheduleToStartTimeout"]),
total_timeout=_common.parse_timeout(data["taskScheduleToCloseTimeout"]),
heartbeat_timeout=_common.parse_timeout(data["taskHeartbeatTimeout"]),
priority=int(data["taskPriority"]),
)
[docs]
@dataclasses.dataclass
class PartialTaskConfiguration(TaskConfiguration, _common.SerialisableToArguments):
"""Partial activity task configuration."""
task_list: str = None
runtime_timeout: t.Union[datetime.timedelta, None] = _common.unset
schedule_timeout: t.Union[datetime.timedelta, None] = _common.unset
total_timeout: t.Union[datetime.timedelta, None] = _common.unset
[docs]
@classmethod
def from_api(cls, data) -> "PartialTaskConfiguration":
return cls(
task_list=data.get("taskList") and data["taskList"]["name"],
runtime_timeout=(
data.get("taskStartToCloseTimeout") and
_common.parse_timeout(data["taskStartToCloseTimeout"])
),
schedule_timeout=(
data.get("taskScheduleToStartTimeout") and
_common.parse_timeout(data["taskScheduleToStartTimeout"])
),
total_timeout=(
data.get("taskScheduleToCloseTimeout") and
_common.parse_timeout(data["taskScheduleToCloseTimeout"])
),
heartbeat_timeout=(
data.get("taskHeartbeatTimeout") and
_common.parse_timeout(data["taskHeartbeatTimeout"])
),
priority=data.get("taskPriority") and int(data["taskPriority"]),
)
[docs]
def get_api_args(self):
data = {}
if self.task_list or self.task_list == "":
data["taskList"] = {"name": self.task_list}
for timeout, name in [
(self.runtime_timeout, "StartToClose"),
(self.schedule_timeout, "ScheduleToStart"),
(self.total_timeout, "ScheduleToClose"),
(self.heartbeat_timeout, "Heartbeat"),
]:
if timeout or timeout == datetime.timedelta(0):
data[f"task{name}Timeout"] = str(int(timeout.total_seconds()))
elif timeout is None:
data[f"task{name}Timeout"] = "NONE"
if self.priority or self.priority == 0:
data["taskPriority"] = str(self.priority)
return data
[docs]
@dataclasses.dataclass
class WorkerTask(_common.Deserialisable):
"""Activity worker activity task."""
token: str
"""Task token, provided by SWF."""
id: str
"""Task ID."""
activity: "_activities.ActivityId"
"""Task activity."""
execution: "_executions.ExecutionId"
"""Task execution."""
task_started_execution_history_event_id: int
"""History event ID for task start."""
input: str = None
"""Task input."""
[docs]
@classmethod
def from_api(cls, data) -> "WorkerTask":
from . import _activities
from . import _executions
return cls(
token=data["taskToken"],
id=data["activityId"],
activity=_activities.ActivityId.from_api(data["activityType"]),
execution=_executions.ExecutionId.from_api(data["workflowExecution"]),
task_started_execution_history_event_id=data["startedEventId"],
input=data.get("input"),
)
[docs]
def get_number_of_pending_tasks(
task_list: str,
domain: str,
client: "botocore.client.BaseClient" = None,
) -> int:
"""Get the number of pending activity tasks.
Warns if the number of pending tasks is greater than what's returned.
Args:
task_list: activity task-list
domain: domain of task-list
client: SWF client
Returns:
number of pending tasks
"""
client = _common.ensure_client(client)
response = client.count_pending_activity_tasks(
domain=domain, taskList=dict(name=task_list)
)
if response["truncated"]:
warnings.warn("Actual task count greater than returned amount")
return response["count"]
[docs]
def request_task(
task_list: str,
domain: str,
worker_identity: str = None,
no_tasks_callback: t.Callable[[], None] = None,
client: "botocore.client.BaseClient" = None,
) -> WorkerTask:
"""Request (poll for) an activity task; blocks until task is received.
Args:
task_list: activity task-list to request from
domain: domain of task-list
worker_identity: activity worker identity, recorded in execution
history
no_tasks_callback: called after no tasks were provided by SWF
client: SWF client
Returns:
activity task
"""
client = _common.ensure_client(client)
kw = {}
if worker_identity or worker_identity == "":
kw["identity"] = worker_identity
with _common.polling_socket_timeout():
while True:
response = client.poll_for_activity_task(
domain=domain, taskList=dict(name=task_list), **kw
)
if response["taskToken"]:
break
no_tasks_callback()
return WorkerTask.from_api(response)
[docs]
def send_heartbeat(
token: str,
details: str = None,
client: "botocore.client.BaseClient" = None,
) -> None:
"""Send a heartbeat to SWF for the current activity task.
Args:
token: activity task token
details: activity task progress message
client: SWF client
Raises:
Cancelled: if activity task was cancelled by decider
"""
client = _common.ensure_client(client)
kw = {}
if details or details == "":
kw["details"] = details
response = client.record_activity_task_heartbeat(taskToken=token, **kw)
if response["cancelRequested"]:
raise Cancelled(f"SWF activity task {token!r} cancelled")
[docs]
def cancel_task(
token: str,
details: str = None,
client: "botocore.client.BaseClient" = None,
) -> None:
"""Cancel the current activity task.
Only valid if the activity task is open and has a cancellation request.
Args:
token: activity task token
details: extra information, usually for explanation
client: SWF client
"""
client = _common.ensure_client(client)
kw = {}
if details or details == "":
kw["details"] = details
client.respond_activity_task_canceled(taskToken=token, **kw)
[docs]
def complete_task(
token: str,
result: str = None,
client: "botocore.client.BaseClient" = None,
) -> None:
"""Complete the current activity task.
Only valid if the activity task is open.
Args:
token: activity task token
result: task result
client: SWF client
"""
client = _common.ensure_client(client)
kw = {}
if result or result == "":
kw["result"] = result
client.respond_activity_task_completed(taskToken=token, **kw)
[docs]
def fail_task(
token: str,
reason: str = None,
details: str = None,
client: "botocore.client.BaseClient" = None,
) -> None:
"""Fail the current activity task.
Only valid if the activity task is open.
Args:
token: activity task token
reason: failure reason, usually for classification
details: failure details, usually for explanation
client: SWF client
"""
client = _common.ensure_client(client)
kw = {}
if reason or reason == "":
kw["reason"] = reason
if details or details == "":
kw["details"] = details
client.respond_activity_task_failed(taskToken=token, **kw)