this repo has no description
40
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 83 lines 2.6 kB view raw
1from letta_client import Letta 2from typing import Optional 3 4def upsert_block(letta: Letta, label: str, value: str, **kwargs): 5 """ 6 Ensures that a block by this label exists. If the block exists, it will 7 replace content provided by kwargs with the values in this function call. 8 """ 9 # Get the list of blocks (v1.0: list returns page object) 10 blocks_page = letta.blocks.list(label=label) 11 blocks = blocks_page.items if hasattr(blocks_page, 'items') else blocks_page 12 13 # Check if we had any -- if not, create it 14 if len(blocks) == 0: 15 # Make the new block 16 new_block = letta.blocks.create( 17 label=label, 18 value=value, 19 **kwargs 20 ) 21 22 return new_block 23 24 if len(blocks) > 1: 25 raise Exception(f"{len(blocks)} blocks by the label '{label}' retrieved, label must identify a unique block") 26 27 else: 28 existing_block = blocks[0] 29 30 if kwargs.get('update', False): 31 # Remove 'update' from kwargs before passing to update 32 kwargs_copy = kwargs.copy() 33 kwargs_copy.pop('update', None) 34 35 updated_block = letta.blocks.update( 36 block_id = existing_block.id, 37 label = label, 38 value = value, 39 **kwargs_copy 40 ) 41 42 return updated_block 43 else: 44 return existing_block 45 46def upsert_agent(letta: Letta, name: str, **kwargs): 47 """ 48 Ensures that an agent by this label exists. If the agent exists, it will 49 update the agent to match kwargs. 50 """ 51 # Get the list of agents (v1.0: list returns page object) 52 agents_page = letta.agents.list(name=name) 53 agents = agents_page.items if hasattr(agents_page, 'items') else agents_page 54 55 # Check if we had any -- if not, create it 56 if len(agents) == 0: 57 # Make the new agent 58 new_agent = letta.agents.create( 59 name=name, 60 **kwargs 61 ) 62 63 return new_agent 64 65 if len(agents) > 1: 66 raise Exception(f"{len(agents)} agents by the name '{name}' retrieved, name must identify a unique agent") 67 68 else: 69 existing_agent = agents[0] 70 71 if kwargs.get('update', False): 72 # Remove 'update' from kwargs before passing to update 73 kwargs_copy = kwargs.copy() 74 kwargs_copy.pop('update', None) 75 76 updated_agent = letta.agents.update( 77 agent_id = existing_agent.id, 78 **kwargs_copy 79 ) 80 81 return updated_agent 82 else: 83 return existing_agent