sink/dynamo
import "github.com/stablekernel/crucible/sink/dynamo"Package dynamo is a sink destination that persists payloads to Amazon DynamoDB through the AWS SDK for Go v2. It depends only on the DynamoDB service client and crucible/sink. Register a transformer that turns each payload type into one of the write operations exposed here (PutItem, UpdateItem, DeleteItem, TransactWrite, BatchWrite), then attach the result of New to a sink.Manifold.
Stability
Section titled “Stability”Experimental (pre-v1); the API may change until the suite locks v1.0.0.
- Variables
- func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client]
- func BatchWriteChecked(in *dynamodb.BatchWriteItemInput) csink.Op[Client]
- func DeleteItem(in *dynamodb.DeleteItemInput) csink.Op[Client]
- func New(client Client, reg *csink.Registry[csink.Op[Client]], opts …csink.EmitterOption) csink.Outlet
- func NewRegistry() *csink.Registry[csink.Op[Client]]
- func PutItem(in *dynamodb.PutItemInput) csink.Op[Client]
- func TransactWrite(in *dynamodb.TransactWriteItemsInput) csink.Op[Client]
- func UpdateItem(in *dynamodb.UpdateItemInput) csink.Op[Client]
- type Client
Variables
Section titled “Variables”ErrUnprocessedItems reports that a BatchWriteItem call returned without a request-level error but left one or more items unprocessed (DynamoDB returns these via UnprocessedItems on the output, typically under throttling). BatchWriteChecked returns an error wrapping this sentinel so a caller can match the class with errors.Is and retry the unprocessed items.
var ErrUnprocessedItems = errors.New("dynamo: BatchWriteItem left items unprocessed")func BatchWrite
Section titled “func BatchWrite”func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client]BatchWrite returns an Op that issues a batch of put and delete requests. The SDK reports per-item failures via UnprocessedItems on the output; this Op returns only the request-level error and ignores unprocessed items. Use BatchWriteChecked when a left-behind item must surface as an error.
func BatchWriteChecked
Section titled “func BatchWriteChecked”func BatchWriteChecked(in *dynamodb.BatchWriteItemInput) csink.Op[Client]BatchWriteChecked returns an Op that issues a batch of put and delete requests and treats unprocessed items as a failure. DynamoDB returns HTTP 200 with a populated UnprocessedItems map when it could not write every item (commonly under throttling), so the request-level error is nil even though work was dropped. The Op inspects the response and, when any item is unprocessed, returns an error wrapping ErrUnprocessedItems reporting how many tables and items were left behind. Pair it with sink.Reservoir or a retry middleware to resubmit the unprocessed items.
func DeleteItem
Section titled “func DeleteItem”func DeleteItem(in *dynamodb.DeleteItemInput) csink.Op[Client]DeleteItem returns an Op that removes a single item identified by the input’s key.
func New
Section titled “func New”func New(client Client, reg *csink.Registry[csink.Op[Client]], opts ...csink.EmitterOption) csink.OutletNew builds an Outlet that applies each payload’s registered Op[Client] to client. The outlet is named “dynamo” unless overridden with sink.WithName.
Example
package main
import ( "context" "fmt"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
csink "github.com/stablekernel/crucible/sink" "github.com/stablekernel/crucible/sink/dynamo")
// recordingClient is a stand-in dynamo.Client that records the tables it writes.type recordingClient struct{ tables []string }
func (r *recordingClient) PutItem(_ context.Context, in *dynamodb.PutItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) { r.tables = append(r.tables, aws.ToString(in.TableName)) return &dynamodb.PutItemOutput{}, nil}
func (r *recordingClient) UpdateItem(_ context.Context, _ *dynamodb.UpdateItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) { return &dynamodb.UpdateItemOutput{}, nil}
func (r *recordingClient) DeleteItem(_ context.Context, _ *dynamodb.DeleteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) { return &dynamodb.DeleteItemOutput{}, nil}
func (r *recordingClient) TransactWriteItems(_ context.Context, _ *dynamodb.TransactWriteItemsInput, _ ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) { return &dynamodb.TransactWriteItemsOutput{}, nil}
func (r *recordingClient) BatchWriteItem(_ context.Context, _ *dynamodb.BatchWriteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error) { return &dynamodb.BatchWriteItemOutput{}, nil}
type userRegistered struct{ Email string }
func main() { client := &recordingClient{} reg := dynamo.NewRegistry() csink.Register(reg, func(_ context.Context, u userRegistered) csink.Op[dynamo.Client] { return dynamo.PutItem(&dynamodb.PutItemInput{ TableName: aws.String("users"), Item: map[string]types.AttributeValue{"email": &types.AttributeValueMemberS{Value: u.Email}}, }) })
outlet := dynamo.New(client, reg) _ = outlet.Sink(context.Background(), userRegistered{Email: "a@example.com"})
fmt.Println(client.tables[0])}Output
Section titled “Output”usersfunc NewRegistry
Section titled “func NewRegistry”func NewRegistry() *csink.Registry[csink.Op[Client]]NewRegistry returns an empty registry of Op[Client] for callers to populate with sink.Register.
func PutItem
Section titled “func PutItem”func PutItem(in *dynamodb.PutItemInput) csink.Op[Client]PutItem returns an Op that creates or replaces a single item. The input carries the table name, the item attributes, and any condition expression.
func TransactWrite
Section titled “func TransactWrite”func TransactWrite(in *dynamodb.TransactWriteItemsInput) csink.Op[Client]TransactWrite returns an Op that applies up to the service-limited set of writes as a single all-or-nothing transaction.
func UpdateItem
Section titled “func UpdateItem”func UpdateItem(in *dynamodb.UpdateItemInput) csink.Op[Client]UpdateItem returns an Op that applies an update expression to a single item identified by the input’s key.
type Client
Section titled “type Client”Client is the narrow DynamoDB surface this destination needs. It declares only the write operations the package issues, so consumers wire the real *dynamodb.Client (which satisfies it structurally) while tests use a hand-rolled fake.
type Client interface { PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) BatchWriteItem(ctx context.Context, params *dynamodb.BatchWriteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)}Generated by gomarkdoc