await setup.setup_all() print("Resources ready:", setup._resources)

| Package | Purpose | |----------------|--------------------------------------| | async-lru | Async caching | | aiofiles | Async file I/O | | aioredis | Redis client | | asyncpg | PostgreSQL driver | | anyio / asyncio | Low-level async primitives |

async def close_postgres(resource): print(f"Closing resource")

Below is a practical, ready-to-use content snippet / documentation-style explanation. aiosetups is a conceptual / utility module that provides reusable patterns to initialize, manage, and gracefully shut down async resources. Typical use case Instead of manually managing:

def get(self, name: str): return self._resources.get(name) async def init_postgres(): # Imagine asyncpg.connect(...) return "conn": "fake pg connection"

def add(self, name: str, init_func, cleanup_func=None): async def _init(): self._resources[name] = await init_func() self._init_tasks.append(_init) if cleanup_func: async def _clean(): await cleanup_func(self._resources[name]) self._cleanup_tasks.insert(0, _clean) # reverse order cleanup

async def setup_all(self): for task in self._init_tasks: await task()

Aiosetups _hot_ Today

await setup.setup_all() print("Resources ready:", setup._resources)

| Package | Purpose | |----------------|--------------------------------------| | async-lru | Async caching | | aiofiles | Async file I/O | | aioredis | Redis client | | asyncpg | PostgreSQL driver | | anyio / asyncio | Low-level async primitives | aiosetups

async def close_postgres(resource): print(f"Closing resource") await setup

Below is a practical, ready-to-use content snippet / documentation-style explanation. aiosetups is a conceptual / utility module that provides reusable patterns to initialize, manage, and gracefully shut down async resources. Typical use case Instead of manually managing: await setup.setup_all() print("Resources ready:"

def get(self, name: str): return self._resources.get(name) async def init_postgres(): # Imagine asyncpg.connect(...) return "conn": "fake pg connection"

def add(self, name: str, init_func, cleanup_func=None): async def _init(): self._resources[name] = await init_func() self._init_tasks.append(_init) if cleanup_func: async def _clean(): await cleanup_func(self._resources[name]) self._cleanup_tasks.insert(0, _clean) # reverse order cleanup

async def setup_all(self): for task in self._init_tasks: await task()