Brokers

Add message brokers to your test environment. Dokkimi deploys a transparent proxy sidecar that captures all published and consumed messages without modifying wire traffic.

Required fields

FieldTypeDescription
type"BROKER"Item type
namestringUnique name (1-63 chars). Used as DNS hostname for service connections.
brokerenum"amqp" or "kafka"

Optional fields

FieldTypeDefaultDescription
descriptionstringHuman-readable description (max 500 chars)
imagestringper engineCustom Docker image (e.g. "rabbitmq:3.13-management", "apache/kafka:3.9")
portintegerper engineNative broker port
healthCheckstringHealth check endpoint or "tcp"
envarrayEnvironment variables: [{ "name": "KEY", "value": "VALUE" }]
commandstring[]Override Docker image CMD
minCpunumberMinimum CPU cores
minMemorynumberMinimum memory in MB
maxCpunumberMaximum CPU cores
maxMemorynumberMaximum memory in MB
stageinteger0Deployment stage — controls bootup order

Engine details

EngineDefault imageNative portDefault connection string
amqprabbitmq:35672amqp://guest:guest@{name}:5672
kafkaapache/kafka:4.3.19092{name}:9092 (bootstrap servers)

How it works

When you add a BROKER item, Dokkimi deploys a broker-proxy sidecar alongside the broker container. The proxy sits between your services and the broker, transparently relaying all wire protocol traffic. It inspects produce/publish and consume/deliver operations to capture message metadata and bodies, which become available as $.messageLogs in your test assertions.

Your services connect to the broker using the item's name as the hostname and the broker's native port (5672 for AMQP, 9092 for Kafka). No code changes needed — the proxy is invisible to your application.

Kafka runs in KRaft mode (no ZooKeeper required) as a single-node broker, suitable for integration testing.

Examples

Basic RabbitMQ

{
  "type": "BROKER",
  "name": "rabbitmq",
  "broker": "amqp"
}

Basic Kafka

{
  "type": "BROKER",
  "name": "kafka",
  "broker": "kafka"
}

Custom image with credentials (AMQP)

{
  "type": "BROKER",
  "name": "rabbitmq",
  "broker": "amqp",
  "image": "rabbitmq:3.13-management",
  "env": [
    { "name": "RABBITMQ_DEFAULT_USER", "value": "myuser" },
    { "name": "RABBITMQ_DEFAULT_PASS", "value": "mypass" }
  ]
}

As a shared fragment (YAML)

# .dokkimi/shared/rabbitmq.yaml
type: BROKER
name: rabbitmq
broker: amqp
# .dokkimi/shared/kafka.yaml
type: BROKER
name: kafka
broker: kafka

Connecting services

Use the broker item's name as the hostname in your service's environment variables:

{
  "type": "SERVICE",
  "name": "order-service",
  "image": "order-service:latest",
  "port": 3000,
  "healthCheck": "/health",
  "env": [
    { "name": "AMQP_URL", "value": "amqp://guest:guest@rabbitmq:5672" }
  ]
}
{
  "type": "SERVICE",
  "name": "event-processor",
  "image": "event-processor:latest",
  "port": 3000,
  "healthCheck": "/health",
  "env": [
    { "name": "KAFKA_BROKERS", "value": "kafka:9092" }
  ]
}

Asserting on message logs

Captured messages are available via $.messageLogs in wait step assertions. Each entry contains:

FieldDescription
brokerBroker item name
brokerTypeProtocol type ("amqp" or "kafka")
operation"publish" / "deliver" (AMQP) or "produce" / "consume" (Kafka)
bodyMessage body (parsed JSON or raw string)
timestampISO timestamp

Protocol-specific metadata fields are also available at the top level of each entry:

ProtocolFieldDescription
AMQPexchangeExchange the message was published to
routingKeyRouting key used for the message
KafkatopicKafka topic
partitionPartition index
keyMessage key (string or parsed JSON)
offsetMessage offset (consume only)

Example: verify an AMQP publish

{
  "name": "Verify message was published",
  "action": { "type": "wait", "durationMs": 500 },
  "assertions": [
    {
      "assertions": [
        { "count": "$.messageLogs", "operator": "gte", "value": 1 },
        { "path": "$.messageLogs[0].operation", "operator": "eq", "value": "publish" },
        { "path": "$.messageLogs[0].routingKey", "operator": "eq", "value": "order-events" },
        { "path": "$.messageLogs[0].body.orderId", "operator": "eq", "value": 42 }
      ]
    }
  ]
}

Example: verify a Kafka produce

{
  "name": "Verify message was produced to Kafka",
  "action": { "type": "wait", "durationMs": 500 },
  "assertions": [
    {
      "assertions": [
        { "count": "$.messageLogs", "operator": "gte", "value": 1 },
        { "path": "$.messageLogs[0].operation", "operator": "eq", "value": "produce" },
        { "path": "$.messageLogs[0].topic", "operator": "eq", "value": "order-events" },
        { "path": "$.messageLogs[0].body.orderId", "operator": "eq", "value": 42 }
      ]
    }
  ]
}

Example: match block for filtering

{
  "name": "Find publish to specific exchange",
  "action": { "type": "wait", "durationMs": 500 },
  "assertions": [
    {
      "match": {
        "path": "$.messageLogs",
        "where": [
          { "path": "$$.operation", "operator": "eq", "value": "publish" },
          { "path": "$$.exchange", "operator": "eq", "value": "events" }
        ],
        "count": 1
      },
      "assertions": [
        { "path": "$.match.body.type", "operator": "eq", "value": "order.created" }
      ]
    }
  ]
}