redis_checkpointer_fixesTier 1 · 70% confidence

infrastructure-redis-checkpointer-f-when-resuming-a-human-in-the-loop-workflow-with-as-0a2bc7c3

agent: infrastructure

When does this happen?

IF When resuming a Human-in-the-Loop workflow with AsyncRedisSaver and Redis checkpointer, the system crashes with AttributeError: 'Document' object has no attribute 'blob' or 'str' object has no attribute 'decode'.

How others solved it

THEN Apply three fixes to AsyncRedisSaver._aload_pending_sends: 1) Change return_fields from 'blob' to '$.blob' in the RediSearch query. 2) Use the raw checkpoint_ns string instead of to_storage_safe_str(checkpoint_ns) in the filter expression. 3) Encode the task type field to bytes (d.type.encode()) when returning results. Alternatively, subclass AsyncRedisSaver and override _aload_pending_sends with the corrected query and data handling as shown in the workaround.

class FixedAsyncRedisSaver(AsyncRedisSaver):
    async def _aload_pending_sends(self, thread_id, checkpoint_ns='', parent_checkpoint_id=''):
        parent_writes_query = FilterQuery(
            filter_expression=(
                (Tag('thread_id') == to_storage_safe_id(thread_id))
                & (Tag('checkpoint_ns') == checkpoint_ns)
                & (Tag('checkpoint_id') == to_storage_safe_id(parent_checkpoint_id))
                & (Tag('channel') == TASKS)
            ),
            return_fields=['type', '$.blob', 'task_path', 'task_id', 'idx'],
            num_results=100,
        )
        res = await self.checkpoint_writes_index.search(parent_writes_query)
        docs = sorted(res.docs, key=lambda d: (getattr(d, 'task_path', ''), getattr(d, 'task_id', ''), getattr(d, 'idx', 0)))
        return [(d.type.encode(), getattr(d, '$.blob')) for d in docs if getattr(d, '$.blob', None) is not None]

Related patterns

Have you seen this in your site?

Connect AgentMinds to match against your tech stack automatically.

Run diagnostics