Skip to content

sink/dynamo

import "github.com/stablekernel/crucible/sink/dynamo"

Package dynamo is a sink destination that persists payloads to Amazon DynamoDB through the AWS SDK for Go v2. It depends only on the DynamoDB service client and crucible/sink. Register a transformer that turns each payload type into one of the write operations exposed here (PutItem, UpdateItem, DeleteItem, TransactWrite, BatchWrite), then attach the result of New to a sink.Manifold.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client]

BatchWrite returns an Op that issues a batch of put and delete requests. The SDK reports per-item failures via UnprocessedItems on the output; this Op returns only the request-level error, so a caller needing retry on unprocessed items should compose a custom Op instead.

func DeleteItem(in *dynamodb.DeleteItemInput) csink.Op[Client]

DeleteItem returns an Op that removes a single item identified by the input’s key.

func New(client Client, reg *csink.Registry[csink.Op[Client]], opts ...csink.EmitterOption) csink.Outlet

New builds an Outlet that applies each payload’s registered Op[Client] to client. The outlet is named “dynamo” unless overridden with sink.WithName.

Example

Apache-2.0
package main
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
csink "github.com/stablekernel/crucible/sink"
"github.com/stablekernel/crucible/sink/dynamo"
)
// recordingClient is a stand-in dynamo.Client that records the tables it writes.
type recordingClient struct{ tables []string }
func (r *recordingClient) PutItem(_ context.Context, in *dynamodb.PutItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) {
r.tables = append(r.tables, aws.ToString(in.TableName))
return &dynamodb.PutItemOutput{}, nil
}
func (r *recordingClient) UpdateItem(_ context.Context, _ *dynamodb.UpdateItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
return &dynamodb.UpdateItemOutput{}, nil
}
func (r *recordingClient) DeleteItem(_ context.Context, _ *dynamodb.DeleteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) {
return &dynamodb.DeleteItemOutput{}, nil
}
func (r *recordingClient) TransactWriteItems(_ context.Context, _ *dynamodb.TransactWriteItemsInput, _ ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) {
return &dynamodb.TransactWriteItemsOutput{}, nil
}
func (r *recordingClient) BatchWriteItem(_ context.Context, _ *dynamodb.BatchWriteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error) {
return &dynamodb.BatchWriteItemOutput{}, nil
}
type userRegistered struct{ Email string }
func main() {
client := &recordingClient{}
reg := dynamo.NewRegistry()
csink.Register(reg, func(_ context.Context, u userRegistered) csink.Op[dynamo.Client] {
return dynamo.PutItem(&dynamodb.PutItemInput{
TableName: aws.String("users"),
Item: map[string]types.AttributeValue{"email": &types.AttributeValueMemberS{Value: u.Email}},
})
})
outlet := dynamo.New(client, reg)
_ = outlet.Sink(context.Background(), userRegistered{Email: "a@example.com"})
fmt.Println(client.tables[0])
}
users

func NewRegistry() *csink.Registry[csink.Op[Client]]

NewRegistry returns an empty registry of Op[Client] for callers to populate with sink.Register.

func PutItem(in *dynamodb.PutItemInput) csink.Op[Client]

PutItem returns an Op that creates or replaces a single item. The input carries the table name, the item attributes, and any condition expression.

func TransactWrite(in *dynamodb.TransactWriteItemsInput) csink.Op[Client]

TransactWrite returns an Op that applies up to the service-limited set of writes as a single all-or-nothing transaction.

func UpdateItem(in *dynamodb.UpdateItemInput) csink.Op[Client]

UpdateItem returns an Op that applies an update expression to a single item identified by the input’s key.

Client is the narrow DynamoDB surface this destination needs. It declares only the write operations the package issues, so consumers wire the real *dynamodb.Client (which satisfies it structurally) while tests use a hand-rolled fake.

type Client interface {
PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error)
BatchWriteItem(ctx context.Context, params *dynamodb.BatchWriteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)
}

Generated by gomarkdoc