Worker API Reference#
- class lifeblood.worker.Worker(scheduler_addr: AddressChain, *, child_priority_adjustment: ProcessPriorityAdjustment = ProcessPriorityAdjustment.NO_CHANGE, worker_type: WorkerType = WorkerType.STANDARD, config: Optional[Config] = None, singleshot: bool = False, scheduler_ping_interval: float = 10, scheduler_ping_miss_threshold: int = 6, worker_id: Optional[int] = None, pool_address: Optional[AddressChain] = None)#
- __init__(scheduler_addr: AddressChain, *, child_priority_adjustment: ProcessPriorityAdjustment = ProcessPriorityAdjustment.NO_CHANGE, worker_type: WorkerType = WorkerType.STANDARD, config: Optional[Config] = None, singleshot: bool = False, scheduler_ping_interval: float = 10, scheduler_ping_miss_threshold: int = 6, worker_id: Optional[int] = None, pool_address: Optional[AddressChain] = None)#
- Parameters:
scheduler_addr –
worker_type –
singleshot –
- async _cleanup_extra_files()#
cleanup extra files transfered with the task :return:
- async cancel_task()#
- async delete_logs(invocation_id: int)#
- async deliver_invocation_message(destination_invocation_id: int, destination_addressee: str, source_invocation_id: Optional[int], message_body: bytes, addressee_timeout: float = 90.0)#
deliver message to task
the idea is to deliver ONLY when message is waited for. so queues are added/removed by receiver, not by this deliver method current impl is NOT thread safe, it relies on async to separate important regions
- get_log_filepath(level, invocation_id: int = None)#
- is_started()#
- is_stopping() bool #
returns True is stop was called on worker, so worker is closed or in the process of closing
- is_task_running() bool #
- message_processor() WorkerMessageProcessor #
- async run_task(task: Invocation, report_to: AddressChain)#
- running_invocation() Optional[Invocation] #
- scheduler_message_address() AddressChain #
- async scheduler_pinger()#
ping scheduler once in a while. if it misses too many pings - close worker and wait for new broadcasts :return:
- async start()#
- stop()#
- async task_finished()#
is called when current process finishes :return:
- task_status() Optional[float] #
- wait_till_starts()#
- async wait_till_stops()#
- worker_message_address() DirectAddress #
- async worker_task_addressee_wait(addressee: str, timeout: float = 30) Tuple[int, bytes] #
wait for a data message to addressee to be delivered
- Returns:
sender invocation id, message body