sink/dynamo
import "github.com/stablekernel/crucible/sink/dynamo"Package dynamo is a sink destination that persists payloads to Amazon DynamoDB through the AWS SDK for Go v2. It depends only on the DynamoDB service client and crucible/sink. Register a transformer that turns each payload type into one of the write operations exposed here (PutItem, UpdateItem, DeleteItem, TransactWrite, BatchWrite), then attach the result of New to a sink.Manifold.
Stability
Section titled “Stability”Experimental (pre-v1); the API may change until the suite locks v1.0.0.
- func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client]
- func DeleteItem(in *dynamodb.DeleteItemInput) csink.Op[Client]
- func New(client Client, reg *csink.Registry[csink.Op[Client]], opts …csink.EmitterOption) csink.Outlet
- func NewRegistry() *csink.Registry[csink.Op[Client]]
- func PutItem(in *dynamodb.PutItemInput) csink.Op[Client]
- func TransactWrite(in *dynamodb.TransactWriteItemsInput) csink.Op[Client]
- func UpdateItem(in *dynamodb.UpdateItemInput) csink.Op[Client]
- type Client
func BatchWrite
Section titled “func BatchWrite”func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client]BatchWrite returns an Op that issues a batch of put and delete requests. The SDK reports per-item failures via UnprocessedItems on the output; this Op returns only the request-level error, so a caller needing retry on unprocessed items should compose a custom Op instead.
func DeleteItem
Section titled “func DeleteItem”func DeleteItem(in *dynamodb.DeleteItemInput) csink.Op[Client]DeleteItem returns an Op that removes a single item identified by the input’s key.
func New
Section titled “func New”func New(client Client, reg *csink.Registry[csink.Op[Client]], opts ...csink.EmitterOption) csink.OutletNew builds an Outlet that applies each payload’s registered Op[Client] to client. The outlet is named “dynamo” unless overridden with sink.WithName.
Example
package main
import ( "context" "fmt"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
csink "github.com/stablekernel/crucible/sink" "github.com/stablekernel/crucible/sink/dynamo")
// recordingClient is a stand-in dynamo.Client that records the tables it writes.type recordingClient struct{ tables []string }
func (r *recordingClient) PutItem(_ context.Context, in *dynamodb.PutItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) { r.tables = append(r.tables, aws.ToString(in.TableName)) return &dynamodb.PutItemOutput{}, nil}
func (r *recordingClient) UpdateItem(_ context.Context, _ *dynamodb.UpdateItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) { return &dynamodb.UpdateItemOutput{}, nil}
func (r *recordingClient) DeleteItem(_ context.Context, _ *dynamodb.DeleteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) { return &dynamodb.DeleteItemOutput{}, nil}
func (r *recordingClient) TransactWriteItems(_ context.Context, _ *dynamodb.TransactWriteItemsInput, _ ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) { return &dynamodb.TransactWriteItemsOutput{}, nil}
func (r *recordingClient) BatchWriteItem(_ context.Context, _ *dynamodb.BatchWriteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error) { return &dynamodb.BatchWriteItemOutput{}, nil}
type userRegistered struct{ Email string }
func main() { client := &recordingClient{} reg := dynamo.NewRegistry() csink.Register(reg, func(_ context.Context, u userRegistered) csink.Op[dynamo.Client] { return dynamo.PutItem(&dynamodb.PutItemInput{ TableName: aws.String("users"), Item: map[string]types.AttributeValue{"email": &types.AttributeValueMemberS{Value: u.Email}}, }) })
outlet := dynamo.New(client, reg) _ = outlet.Sink(context.Background(), userRegistered{Email: "a@example.com"})
fmt.Println(client.tables[0])}Output
Section titled “Output”usersfunc NewRegistry
Section titled “func NewRegistry”func NewRegistry() *csink.Registry[csink.Op[Client]]NewRegistry returns an empty registry of Op[Client] for callers to populate with sink.Register.
func PutItem
Section titled “func PutItem”func PutItem(in *dynamodb.PutItemInput) csink.Op[Client]PutItem returns an Op that creates or replaces a single item. The input carries the table name, the item attributes, and any condition expression.
func TransactWrite
Section titled “func TransactWrite”func TransactWrite(in *dynamodb.TransactWriteItemsInput) csink.Op[Client]TransactWrite returns an Op that applies up to the service-limited set of writes as a single all-or-nothing transaction.
func UpdateItem
Section titled “func UpdateItem”func UpdateItem(in *dynamodb.UpdateItemInput) csink.Op[Client]UpdateItem returns an Op that applies an update expression to a single item identified by the input’s key.
type Client
Section titled “type Client”Client is the narrow DynamoDB surface this destination needs. It declares only the write operations the package issues, so consumers wire the real *dynamodb.Client (which satisfies it structurally) while tests use a hand-rolled fake.
type Client interface { PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) BatchWriteItem(ctx context.Context, params *dynamodb.BatchWriteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)}Generated by gomarkdoc