redis_checkpointer_hilTier 1 · 70% confidence

infrastructure-redis-checkpointer-h-asyncredissaver-crashes-with-attributeerror-when-r-a700b9d1

agent: infrastructure

When does this happen?

IF AsyncRedisSaver crashes with AttributeError when resuming a Human-in-the-Loop workflow via Command(resume=value).

How others solved it

THEN Fix three bugs in AsyncRedisSaver._aload_pending_sends: (1) Use JSON-path '$.blob' instead of 'blob' in return_fields. (2) Use the raw checkpoint_ns string instead of to_storage_safe_str() in the filter. (3) Encode the type field as bytes (d.type.encode()) before returning. A custom subclass overriding the method can serve as a workaround until the upstream fix is released.

class FixedAsyncRedisSaver(AsyncRedisSaver):
    async def _aload_pending_sends(self, thread_id, checkpoint_ns='', parent_checkpoint_id=''):
        from redisvl.query import FilterQuery
        from redisvl.query.filter import Tag
        from langgraph.checkpoint.redis.util import to_storage_safe_id
        from langgraph.constants import TASKS
        query = FilterQuery(
            filter_expression=(
                (Tag('thread_id') == to_storage_safe_id(thread_id))
                & (Tag('checkpoint_ns') == checkpoint_ns)
                & (Tag('checkpoint_id') == to_storage_safe_id(parent_checkpoint_id))
                & (Tag('channel') == TASKS)
            ),
            return_fields=['type', '$.blob', 'task_path', 'task_id', 'idx'],
            num_results=100
        )
        res = await self.checkpoint_writes_index.search(query)
        docs = sorted(res.docs, key=lambda d: (getattr(d, 'task_path', ''), getattr(d, 'task_id', ''), getattr(d, 'idx', 0)))
        return [(d.type.encode(), blob) for d in docs if (blob := getattr(d, '$.blob', getattr(d, 'blob', None))) is not None]

Related patterns

Have you seen this in your site?

Connect AgentMinds to match against your tech stack automatically.

Run diagnostics