diff --git a/docs/memory.md b/docs/memory.md index 580dc070..0ebf87e9 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -198,6 +198,66 @@ Memory events expire after a configurable duration (7-365 days, default 30): } ``` +## Memory Record Streaming + +Memory record streaming delivers real-time events when memory records are created, updated, or deleted. Events are +pushed to a delivery target in your account, enabling event-driven architectures without polling. + +### Enabling Streaming + +Via CLI flags: + +```bash +agentcore add memory \ + --name MyMemory \ + --strategies SEMANTIC \ + --data-stream-arn arn:aws:kinesis:us-west-2:123456789012:stream/my-stream \ + --stream-content-level FULL_CONTENT +``` + +For advanced configurations (e.g. multiple delivery targets), pass the full JSON: + +```bash +agentcore add memory \ + --name MyMemory \ + --strategies SEMANTIC \ + --stream-delivery-resources '{"resources":[{"kinesis":{"dataStreamArn":"arn:aws:kinesis:us-west-2:123456789012:stream/my-stream","contentConfigurations":[{"type":"MEMORY_RECORDS","level":"FULL_CONTENT"}]}}]}' +``` + +### Configuration + +```json +{ + "type": "AgentCoreMemory", + "name": "MyMemory", + "eventExpiryDuration": 30, + "strategies": [{ "type": "SEMANTIC" }], + "streamDeliveryResources": { + "resources": [ + { + "kinesis": { + "dataStreamArn": "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream", + "contentConfigurations": [{ "type": "MEMORY_RECORDS", "level": "FULL_CONTENT" }] + } + } + ] + } +} +``` + +### Content Level + +| Level | Description | +| --------------- | ---------------------------------------------------------- | +| `FULL_CONTENT` | Events include memory record text and all metadata | +| `METADATA_ONLY` | Events include only metadata (IDs, timestamps, namespaces) | + +The CDK construct automatically grants the memory execution role permission to publish to the configured delivery +target. + +For more details, see the +[Memory Record Streaming documentation](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-record-streaming.html). + ## Using Memory in Code The memory ID is available via environment variable: diff --git a/src/cli/commands/add/__tests__/validate.test.ts b/src/cli/commands/add/__tests__/validate.test.ts index 7c23f71b..6c4e6795 100644 --- a/src/cli/commands/add/__tests__/validate.test.ts +++ b/src/cli/commands/add/__tests__/validate.test.ts @@ -738,6 +738,71 @@ describe('validate', () => { valid: true, }); }); + + // Streaming validation + it('accepts valid streaming options', () => { + expect( + validateAddMemoryOptions({ + ...validMemoryOptions, + dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test', + contentLevel: 'FULL_CONTENT', + }) + ).toEqual({ valid: true }); + }); + + it('accepts dataStreamArn without contentLevel (defaults to FULL_CONTENT)', () => { + expect( + validateAddMemoryOptions({ + ...validMemoryOptions, + dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test', + }) + ).toEqual({ valid: true }); + }); + + it('rejects contentLevel without dataStreamArn', () => { + const result = validateAddMemoryOptions({ ...validMemoryOptions, contentLevel: 'FULL_CONTENT' }); + expect(result.valid).toBe(false); + expect(result.error).toContain('--data-stream-arn is required'); + }); + + it('rejects invalid contentLevel', () => { + const result = validateAddMemoryOptions({ + ...validMemoryOptions, + dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test', + contentLevel: 'INVALID', + }); + expect(result.valid).toBe(false); + expect(result.error).toContain('Invalid content level'); + }); + + it('rejects invalid deliveryType', () => { + const result = validateAddMemoryOptions({ ...validMemoryOptions, deliveryType: 'sqs' }); + expect(result.valid).toBe(false); + expect(result.error).toContain('Invalid delivery type'); + }); + + it('accepts valid deliveryType', () => { + expect(validateAddMemoryOptions({ ...validMemoryOptions, deliveryType: 'kinesis' })).toEqual({ valid: true }); + }); + + it('rejects dataStreamArn not starting with arn:', () => { + const result = validateAddMemoryOptions({ + ...validMemoryOptions, + dataStreamArn: 'not-an-arn', + }); + expect(result.valid).toBe(false); + expect(result.error).toContain('valid ARN'); + }); + + it('rejects combining streamDeliveryResources with flat flags', () => { + const result = validateAddMemoryOptions({ + ...validMemoryOptions, + dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test', + streamDeliveryResources: '{"resources":[]}', + }); + expect(result.valid).toBe(false); + expect(result.error).toContain('cannot be combined'); + }); }); describe('validateAddIdentityOptions', () => { diff --git a/src/cli/commands/add/types.ts b/src/cli/commands/add/types.ts index 17a9facc..70f5fefd 100644 --- a/src/cli/commands/add/types.ts +++ b/src/cli/commands/add/types.ts @@ -80,6 +80,10 @@ export interface AddMemoryOptions { name?: string; strategies?: string; expiry?: number; + deliveryType?: string; + dataStreamArn?: string; + contentLevel?: string; + streamDeliveryResources?: string; json?: boolean; } diff --git a/src/cli/commands/add/validate.ts b/src/cli/commands/add/validate.ts index 0e524f77..105f56bb 100644 --- a/src/cli/commands/add/validate.ts +++ b/src/cli/commands/add/validate.ts @@ -28,6 +28,8 @@ export interface ValidationResult { const MEMORY_OPTIONS = ['none', 'shortTerm', 'longAndShortTerm'] as const; const OIDC_WELL_KNOWN_SUFFIX = '/.well-known/openid-configuration'; const VALID_STRATEGIES = ['SEMANTIC', 'SUMMARIZATION', 'USER_PREFERENCE']; +const VALID_STREAM_CONTENT_LEVELS = ['FULL_CONTENT', 'METADATA_ONLY']; +const VALID_DELIVERY_TYPES = ['kinesis']; /** * Validate that a credential name exists in the project spec. @@ -447,6 +449,35 @@ export function validateAddMemoryOptions(options: AddMemoryOptions): ValidationR } } + if (options.streamDeliveryResources && (options.dataStreamArn || options.contentLevel)) { + return { + valid: false, + error: '--stream-delivery-resources cannot be combined with --data-stream-arn or --stream-content-level', + }; + } + + if (options.contentLevel && !options.dataStreamArn) { + return { valid: false, error: '--data-stream-arn is required when --stream-content-level is set' }; + } + + if (options.dataStreamArn && !options.dataStreamArn.startsWith('arn:')) { + return { valid: false, error: '--data-stream-arn must be a valid ARN (starts with arn:)' }; + } + + if (options.deliveryType && !VALID_DELIVERY_TYPES.includes(options.deliveryType)) { + return { + valid: false, + error: `Invalid delivery type. Must be one of: ${VALID_DELIVERY_TYPES.join(', ')}`, + }; + } + + if (options.contentLevel && !VALID_STREAM_CONTENT_LEVELS.includes(options.contentLevel)) { + return { + valid: false, + error: `Invalid content level. Must be one of: ${VALID_STREAM_CONTENT_LEVELS.join(', ')}`, + }; + } + return { valid: true }; } diff --git a/src/cli/primitives/MemoryPrimitive.tsx b/src/cli/primitives/MemoryPrimitive.tsx index df8dce93..2a56d476 100644 --- a/src/cli/primitives/MemoryPrimitive.tsx +++ b/src/cli/primitives/MemoryPrimitive.tsx @@ -1,6 +1,17 @@ import { findConfigRoot } from '../../lib'; -import type { Memory, MemoryStrategy, MemoryStrategyType } from '../../schema'; -import { DEFAULT_STRATEGY_NAMESPACES, MemorySchema } from '../../schema'; +import type { + Memory, + MemoryStrategy, + MemoryStrategyType, + StreamContentLevel, + StreamDeliveryResources, +} from '../../schema'; +import { + DEFAULT_STRATEGY_NAMESPACES, + MemorySchema, + StreamContentLevelSchema, + StreamDeliveryResourcesSchema, +} from '../../schema'; import { validateAddMemoryOptions } from '../commands/add/validate'; import { getErrorMessage } from '../errors'; import type { RemovalPreview, RemovalResult, SchemaChange } from '../operations/remove/types'; @@ -16,6 +27,12 @@ export interface AddMemoryOptions { name: string; strategies?: string; expiry?: number; + deliveryType?: string; + // Flat flags for the simple single-stream case + dataStreamArn?: string; + contentLevel?: string; + // Raw JSON for advanced/multi-target configurations. Takes precedence over flat flags. + streamDeliveryResources?: string; } /** @@ -42,10 +59,21 @@ export class MemoryPrimitive extends BasePrimitive ({ type: type as MemoryStrategyType })) : []; + const streamDeliveryResources = options.streamDeliveryResources + ? this.parseStreamDeliveryResources(options.streamDeliveryResources) + : options.dataStreamArn + ? this.buildStreamDeliveryResources({ + deliveryType: options.deliveryType ?? 'kinesis', + dataStreamArn: options.dataStreamArn, + contentLevel: StreamContentLevelSchema.parse(options.contentLevel ?? 'FULL_CONTENT'), + }) + : undefined; + const memory = await this.createMemory({ name: options.name, eventExpiryDuration: options.expiry ?? DEFAULT_EVENT_EXPIRY, strategies, + streamDeliveryResources, }); return { success: true, memoryName: memory.name }; @@ -129,73 +157,102 @@ export class MemoryPrimitive extends BasePrimitive', 'Event expiry duration in days (default: 30) [non-interactive]') + .option('--delivery-type ', 'Delivery target type (default: kinesis) [non-interactive]') + .option('--data-stream-arn ', 'Kinesis data stream ARN for memory record streaming [non-interactive]') + .option( + '--stream-content-level ', + 'Stream content level: FULL_CONTENT or METADATA_ONLY (default: FULL_CONTENT) [non-interactive]' + ) + .option( + '--stream-delivery-resources ', + 'Stream delivery config as JSON string (advanced, overrides flat flags) [non-interactive]' + ) .option('--json', 'Output as JSON [non-interactive]') - .action(async (cliOptions: { name?: string; strategies?: string; expiry?: string; json?: boolean }) => { - try { - if (!findConfigRoot()) { - console.error('No agentcore project found. Run `agentcore create` first.'); - process.exit(1); - } + .action( + async (cliOptions: { + name?: string; + strategies?: string; + expiry?: string; + deliveryType?: string; + dataStreamArn?: string; + streamContentLevel?: string; + streamDeliveryResources?: string; + json?: boolean; + }) => { + try { + if (!findConfigRoot()) { + console.error('No agentcore project found. Run `agentcore create` first.'); + process.exit(1); + } + + if (cliOptions.name || cliOptions.json) { + // CLI mode + const expiry = cliOptions.expiry ? parseInt(cliOptions.expiry, 10) : undefined; + const validation = validateAddMemoryOptions({ + name: cliOptions.name, + strategies: cliOptions.strategies, + expiry, + deliveryType: cliOptions.deliveryType, + dataStreamArn: cliOptions.dataStreamArn, + contentLevel: cliOptions.streamContentLevel, + streamDeliveryResources: cliOptions.streamDeliveryResources, + }); - if (cliOptions.name || cliOptions.json) { - // CLI mode - const expiry = cliOptions.expiry ? parseInt(cliOptions.expiry, 10) : undefined; - const validation = validateAddMemoryOptions({ - name: cliOptions.name, - strategies: cliOptions.strategies, - expiry, - }); + if (!validation.valid) { + if (cliOptions.json) { + console.log(JSON.stringify({ success: false, error: validation.error })); + } else { + console.error(validation.error); + } + process.exit(1); + } + + const result = await this.add({ + name: cliOptions.name!, + strategies: cliOptions.strategies, + expiry, + deliveryType: cliOptions.deliveryType, + dataStreamArn: cliOptions.dataStreamArn, + contentLevel: cliOptions.streamContentLevel, + streamDeliveryResources: cliOptions.streamDeliveryResources, + }); - if (!validation.valid) { if (cliOptions.json) { - console.log(JSON.stringify({ success: false, error: validation.error })); + console.log(JSON.stringify(result)); + } else if (result.success) { + console.log(`Added memory '${result.memoryName}'`); } else { - console.error(validation.error); + console.error(result.error); } - process.exit(1); + process.exit(result.success ? 0 : 1); + } else { + // TUI fallback — dynamic imports to avoid pulling ink (async) into registry + const [{ render }, { default: React }, { AddFlow }] = await Promise.all([ + import('ink'), + import('react'), + import('../tui/screens/add/AddFlow'), + ]); + const { clear, unmount } = render( + React.createElement(AddFlow, { + isInteractive: false, + onExit: () => { + clear(); + unmount(); + process.exit(0); + }, + }) + ); } - - const result = await this.add({ - name: cliOptions.name!, - strategies: cliOptions.strategies, - expiry, - }); - + } catch (error) { if (cliOptions.json) { - console.log(JSON.stringify(result)); - } else if (result.success) { - console.log(`Added memory '${result.memoryName}'`); + console.log(JSON.stringify({ success: false, error: getErrorMessage(error) })); } else { - console.error(result.error); + console.error(getErrorMessage(error)); } - process.exit(result.success ? 0 : 1); - } else { - // TUI fallback — dynamic imports to avoid pulling ink (async) into registry - const [{ render }, { default: React }, { AddFlow }] = await Promise.all([ - import('ink'), - import('react'), - import('../tui/screens/add/AddFlow'), - ]); - const { clear, unmount } = render( - React.createElement(AddFlow, { - isInteractive: false, - onExit: () => { - clear(); - unmount(); - process.exit(0); - }, - }) - ); - } - } catch (error) { - if (cliOptions.json) { - console.log(JSON.stringify({ success: false, error: getErrorMessage(error) })); - } else { - console.error(getErrorMessage(error)); + process.exit(1); } - process.exit(1); } - }); + ); this.registerRemoveSubcommand(removeCmd); } @@ -211,6 +268,7 @@ export class MemoryPrimitive extends BasePrimitive { const project = await this.readProjectSpec(); @@ -231,6 +289,7 @@ export class MemoryPrimitive extends BasePrimitive { }); expect(result.success).toBe(false); }); + + it('accepts memory with streamDeliveryResources', () => { + const result = MemorySchema.safeParse({ + type: 'AgentCoreMemory', + name: 'StreamMemory', + eventExpiryDuration: 30, + strategies: [{ type: 'SEMANTIC' }], + streamDeliveryResources: { + resources: [ + { + kinesis: { + dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test', + contentConfigurations: [{ type: 'MEMORY_RECORDS', level: 'FULL_CONTENT' }], + }, + }, + ], + }, + }); + expect(result.success).toBe(true); + }); + + it('accepts memory without streamDeliveryResources', () => { + const result = MemorySchema.safeParse({ + type: 'AgentCoreMemory', + name: 'NoStream', + eventExpiryDuration: 30, + strategies: [], + }); + expect(result.success).toBe(true); + expect(result.data?.streamDeliveryResources).toBeUndefined(); + }); + + it('rejects streamDeliveryResources with empty resources array', () => { + const result = MemorySchema.safeParse({ + type: 'AgentCoreMemory', + name: 'Test', + eventExpiryDuration: 30, + strategies: [], + streamDeliveryResources: { resources: [] }, + }); + expect(result.success).toBe(false); + }); + + it('rejects streamDeliveryResources with empty contentConfigurations', () => { + const result = MemorySchema.safeParse({ + type: 'AgentCoreMemory', + name: 'Test', + eventExpiryDuration: 30, + strategies: [], + streamDeliveryResources: { + resources: [ + { + kinesis: { dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test', contentConfigurations: [] }, + }, + ], + }, + }); + expect(result.success).toBe(false); + }); + + it('rejects streamDeliveryResources with empty dataStreamArn', () => { + const result = MemorySchema.safeParse({ + type: 'AgentCoreMemory', + name: 'Test', + eventExpiryDuration: 30, + strategies: [], + streamDeliveryResources: { + resources: [ + { + kinesis: { dataStreamArn: '', contentConfigurations: [{ type: 'MEMORY_RECORDS', level: 'FULL_CONTENT' }] }, + }, + ], + }, + }); + expect(result.success).toBe(false); + }); + + it('rejects invalid content level in streamDeliveryResources', () => { + const result = MemorySchema.safeParse({ + type: 'AgentCoreMemory', + name: 'Test', + eventExpiryDuration: 30, + strategies: [], + streamDeliveryResources: { + resources: [ + { + kinesis: { + dataStreamArn: 'arn:test', + contentConfigurations: [{ type: 'MEMORY_RECORDS', level: 'INVALID' }], + }, + }, + ], + }, + }); + expect(result.success).toBe(false); + }); }); describe('CredentialNameSchema', () => { diff --git a/src/schema/schemas/agentcore-project.ts b/src/schema/schemas/agentcore-project.ts index fda34160..c78f54ae 100644 --- a/src/schema/schemas/agentcore-project.ts +++ b/src/schema/schemas/agentcore-project.ts @@ -48,6 +48,34 @@ export const MemoryNameSchema = z 'Must begin with a letter and contain only alphanumeric characters and underscores (max 48 chars)' ); +export const StreamContentLevelSchema = z.enum(['FULL_CONTENT', 'METADATA_ONLY']); +export type StreamContentLevel = z.infer; + +// TODO: kinesis is currently the only supported delivery type. When additional types +// (e.g. S3, EventBridge) are added, this should become a discriminated union. +// Non-kinesis resources will produce a Zod error about the missing kinesis field. +export const StreamDeliveryResourcesSchema = z.object({ + resources: z + .array( + z.object({ + kinesis: z.object({ + dataStreamArn: z.string().min(1), + contentConfigurations: z + .array( + z.object({ + type: z.literal('MEMORY_RECORDS'), + level: StreamContentLevelSchema, + }) + ) + .min(1), + }), + }) + ) + .min(1), +}); + +export type StreamDeliveryResources = z.infer; + export const MemorySchema = z.object({ type: MemoryTypeSchema, name: MemoryNameSchema, @@ -63,6 +91,7 @@ export const MemorySchema = z.object({ type => `Duplicate memory strategy type: ${type}` ) ), + streamDeliveryResources: StreamDeliveryResourcesSchema.optional(), }); export type Memory = z.infer;