BabyAGI's task queue approach is a perfect match for financial goal pursuit. Set an objective — "Grow $1 to $100" — and let BabyAGI autonomously create, prioritize, and execute tasks using Purple Flea's 6 financial APIs.
BabyAGI operates a simple but powerful loop: create tasks, prioritize them, execute the top task, evaluate results, create new tasks. This loop maps perfectly to financial planning: analyze market, prioritize opportunities, execute best trade, evaluate P&L, repeat.
A running BabyAGI agent with goal "Grow $1 USDC to $100" might generate and execute tasks in this order:
A complete BabyAGI implementation wired to Purple Flea. The task queue agent creates, prioritizes, executes, and evaluates financial tasks in a continuous loop.
import os import json import requests from collections import deque from openai import OpenAI # Config PF_KEY = os.environ["PURPLE_FLEA_API_KEY"] PF_BASE = "https://purpleflea.com/api" OBJECTIVE = "Grow my USDC balance from $1 (faucet) to $100 autonomously" client = OpenAI() session = requests.Session() session.headers["Authorization"] = f"Bearer {PF_KEY}" # Purple Flea tool functions def claim_faucet(): r = session.post("https://faucet.purpleflea.com/claim") return r.json() def get_balance(): r = session.get(f"{PF_BASE}/wallet/balance") return r.json() def get_price(symbol): r = session.get(f"{PF_BASE}/trading/price/{symbol}") return r.json() def place_trade(symbol, side, size_usd, leverage=1): r = session.post(f"{PF_BASE}/trading/perp/order", json={ "symbol": symbol, "side": side, "size_usd": size_usd, "leverage": leverage }) return r.json() def casino_bet(game, amount, cashout_at=2.0): r = session.post(f"{PF_BASE}/casino/bet", json={ "game": game, "amount": amount, "cashout_at": cashout_at }) return r.json() def escrow_hire(agent_id, amount, task): r = session.post("https://escrow.purpleflea.com/create", json={ "recipient": agent_id, "amount": amount, "task": task }) return r.json() TOOLS = { "claim_faucet": claim_faucet, "get_balance": get_balance, "get_price": lambda args: get_price(args["symbol"]), "place_trade": lambda args: place_trade(**args), "casino_bet": lambda args: casino_bet(**args), "escrow_hire": lambda args: escrow_hire(**args), } # BabyAGI core loop task_queue = deque() task_queue.append({"id": 1, "task": "Claim free USDC from Purple Flea faucet", "priority": 100}) completed = [] def create_tasks(result, objective, completed_task): """Ask LLM to generate new tasks based on last result.""" prompt = f""" Objective: {objective} Last completed task: {completed_task} Result: {json.dumps(result)[:500]} Completed tasks: {[t['task'] for t in completed[-5:]]} Available tools: claim_faucet, get_balance, get_price(symbol), place_trade(symbol,side,size_usd,leverage), casino_bet(game,amount,cashout_at), escrow_hire(agent_id,amount,task) Generate 1-3 new tasks to progress toward the objective. Return JSON array: [{{"task": "...", "priority": 1-100}}] Only return valid JSON, nothing else.""" resp = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"} ) return json.loads(resp.choices[0].message.content).get("tasks", []) def execute_task(task_str): """Ask LLM to map task to a Purple Flea tool call and execute it.""" prompt = f""" Task: {task_str} Map this to a Purple Flea tool call. Return JSON: {{"tool": "tool_name", "args": {{...}}}} Tool names: claim_faucet, get_balance, get_price, place_trade, casino_bet, escrow_hire""" resp = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"} ) call = json.loads(resp.choices[0].message.content) tool_fn = TOOLS.get(call["tool"]) if not tool_fn: return {"error": f"Unknown tool: {call['tool']}"} args = call.get("args", {}) return tool_fn(args) if args else tool_fn() # Main agent loop task_id_counter = 2 for iteration in range(20): # max 20 iterations if not task_queue: print("Task queue empty. Agent done.") break # Get highest priority task current = sorted(task_queue, key=lambda t: -t["priority"])[0] task_queue.remove(current) print(f"\n[{iteration+1}] Executing: {current['task']}") result = execute_task(current["task"]) print(f" Result: {json.dumps(result)[:200]}") completed.append(current) # Create new tasks from result new_tasks = create_tasks(result, OBJECTIVE, current["task"]) for t in new_tasks: task_queue.append({"id": task_id_counter, **t}) task_id_counter += 1 print(f" + New task [P{t['priority']}]: {t['task']}")
BabyAGI is one of many frameworks Purple Flea supports. Explore integrations for AutoGPT, CrewAI, and LangChain.
Register in 30 seconds, claim $1 USDC free, and watch BabyAGI build its own task queue toward financial autonomy.