Skip to content

sink/dynamo

import "github.com/stablekernel/crucible/sink/dynamo"

Package dynamo is a sink destination that persists payloads to Amazon DynamoDB through the AWS SDK for Go v2. It depends only on the DynamoDB service client and crucible/sink. Register a transformer that turns each payload type into one of the write operations exposed here (PutItem, UpdateItem, DeleteItem, TransactWrite, BatchWrite), then attach the result of New to a sink.Manifold.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

ErrUnprocessedItems reports that a BatchWriteItem call returned without a request-level error but left one or more items unprocessed (DynamoDB returns these via UnprocessedItems on the output, typically under throttling). BatchWriteChecked returns an error wrapping this sentinel so a caller can match the class with errors.Is and retry the unprocessed items.

var ErrUnprocessedItems = errors.New("dynamo: BatchWriteItem left items unprocessed")

func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client]

BatchWrite returns an Op that issues a batch of put and delete requests. The SDK reports per-item failures via UnprocessedItems on the output; this Op returns only the request-level error and ignores unprocessed items. Use BatchWriteChecked when a left-behind item must surface as an error.

func BatchWriteChecked(in *dynamodb.BatchWriteItemInput) csink.Op[Client]

BatchWriteChecked returns an Op that issues a batch of put and delete requests and treats unprocessed items as a failure. DynamoDB returns HTTP 200 with a populated UnprocessedItems map when it could not write every item (commonly under throttling), so the request-level error is nil even though work was dropped. The Op inspects the response and, when any item is unprocessed, returns an error wrapping ErrUnprocessedItems reporting how many tables and items were left behind. Pair it with sink.Reservoir or a retry middleware to resubmit the unprocessed items.

func DeleteItem(in *dynamodb.DeleteItemInput) csink.Op[Client]

DeleteItem returns an Op that removes a single item identified by the input’s key.

func New(client Client, reg *csink.Registry[csink.Op[Client]], opts ...csink.EmitterOption) csink.Outlet

New builds an Outlet that applies each payload’s registered Op[Client] to client. The outlet is named “dynamo” unless overridden with sink.WithName.

Example

Apache-2.0
package main
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
csink "github.com/stablekernel/crucible/sink"
"github.com/stablekernel/crucible/sink/dynamo"
)
// recordingClient is a stand-in dynamo.Client that records the tables it writes.
type recordingClient struct{ tables []string }
func (r *recordingClient) PutItem(_ context.Context, in *dynamodb.PutItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) {
r.tables = append(r.tables, aws.ToString(in.TableName))
return &dynamodb.PutItemOutput{}, nil
}
func (r *recordingClient) UpdateItem(_ context.Context, _ *dynamodb.UpdateItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
return &dynamodb.UpdateItemOutput{}, nil
}
func (r *recordingClient) DeleteItem(_ context.Context, _ *dynamodb.DeleteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) {
return &dynamodb.DeleteItemOutput{}, nil
}
func (r *recordingClient) TransactWriteItems(_ context.Context, _ *dynamodb.TransactWriteItemsInput, _ ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) {
return &dynamodb.TransactWriteItemsOutput{}, nil
}
func (r *recordingClient) BatchWriteItem(_ context.Context, _ *dynamodb.BatchWriteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error) {
return &dynamodb.BatchWriteItemOutput{}, nil
}
type userRegistered struct{ Email string }
func main() {
client := &recordingClient{}
reg := dynamo.NewRegistry()
csink.Register(reg, func(_ context.Context, u userRegistered) csink.Op[dynamo.Client] {
return dynamo.PutItem(&dynamodb.PutItemInput{
TableName: aws.String("users"),
Item: map[string]types.AttributeValue{"email": &types.AttributeValueMemberS{Value: u.Email}},
})
})
outlet := dynamo.New(client, reg)
_ = outlet.Sink(context.Background(), userRegistered{Email: "a@example.com"})
fmt.Println(client.tables[0])
}
users

func NewRegistry() *csink.Registry[csink.Op[Client]]

NewRegistry returns an empty registry of Op[Client] for callers to populate with sink.Register.

func PutItem(in *dynamodb.PutItemInput) csink.Op[Client]

PutItem returns an Op that creates or replaces a single item. The input carries the table name, the item attributes, and any condition expression.

func TransactWrite(in *dynamodb.TransactWriteItemsInput) csink.Op[Client]

TransactWrite returns an Op that applies up to the service-limited set of writes as a single all-or-nothing transaction.

func UpdateItem(in *dynamodb.UpdateItemInput) csink.Op[Client]

UpdateItem returns an Op that applies an update expression to a single item identified by the input’s key.

Client is the narrow DynamoDB surface this destination needs. It declares only the write operations the package issues, so consumers wire the real *dynamodb.Client (which satisfies it structurally) while tests use a hand-rolled fake.

type Client interface {
PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error)
BatchWriteItem(ctx context.Context, params *dynamodb.BatchWriteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)
}

Generated by gomarkdoc