Waterstream Quick Start with Azure Event Hubs ============================================= This guide explains how to run Waterstream with Azure Event Hubs using the Kafka endpoint, including all currently supported authentication modes: - SASL/PLAIN with Event Hubs connection string (SAS) - SASL/OAUTHBEARER with Microsoft Entra ID (client secret) - SASL/OAUTHBEARER with default Azure credential chain - SASL/OAUTHBEARER with managed identity (Azure VM) Prerequisites ------------- - `Azure subscription `_ - `Azure CLI `_ (``az``), logged in: .. code-block:: bash az login - `jq `_ - `Docker `_ - (Optional) `mosquitto-clients `_ for MQTT smoke tests - A Waterstream license file (``waterstream.license``) Create Azure resources ---------------------- Use Azure CLI to provision all required resources. Event Hubs plan vs Waterstream functionality -------------------------------------------- The matrix below summarizes how Azure Event Hubs plan selection affects Waterstream features. It is based on current Waterstream behavior (QoS 2 relies on Kafka transactions for strongest guarantees) and the Azure Event Hubs plan capabilities referenced by this guide. Microsoft references used for this section: - `Apache Kafka protocol support in Azure Event Hubs `_ - `Azure Event Hubs tier comparison `_ .. list-table:: Event Hubs plan compatibility matrix :widths: 14 16 16 24 :header-rows: 1 * - Plan - Kafka endpoint - Waterstream QoS 0/1 - Waterstream QoS 2 with Kafka transactions * - Basic - No - Not supported - Not supported * - Standard - Yes - Supported - Not supported * - Premium - Yes - Supported - Supported (public preview) * - Dedicated - Yes - Supported - Supported (public preview) .. note:: If your target is strict QoS 2 consistency based on Kafka transactions, use Premium or Dedicated and validate preview status in the latest Microsoft docs. Azure CLI setup ~~~~~~~~~~~~~~~ Set variables: .. code-block:: bash # Optional: keep these in a local .env file and source it before running commands export AZURE_REGION=westeurope export RESOURCE_GROUP=waterstream-test-rg export EVENTHUBS_NAMESPACE=waterstream-test-ns export SP_NAME=waterstream-test-sp export EVENTHUBS_SKU=Standard Create resource group and namespace: .. code-block:: bash az group create \ --name "$RESOURCE_GROUP" \ --location "$AZURE_REGION" az eventhubs namespace create \ --name "$EVENTHUBS_NAMESPACE" \ --resource-group "$RESOURCE_GROUP" \ --sku "$EVENTHUBS_SKU" \ --enable-kafka true .. note:: Use ``EVENTHUBS_SKU=Standard`` for basic Kafka endpoint usage. Use ``EVENTHUBS_SKU=Premium`` (or Dedicated) when QoS 2 must use Kafka transactions. .. warning:: Keep ``AZURE_CLIENT_SECRET`` and SAS connection strings in local-only files. Do not commit them to source control. Create Event Hubs (topics): .. code-block:: bash for topic in \ mqtt_sessions \ mqtt_retained_messages \ mqtt_connections \ mqtt_messages \ waterstream-kafka-table-mqtt_sessions-changelog; do az eventhubs eventhub create \ --name "$topic" \ --namespace-name "$EVENTHUBS_NAMESPACE" \ --resource-group "$RESOURCE_GROUP" \ --partition-count 5 done Create an Entra app + service principal + client secret: .. code-block:: bash APP_ID=$(az ad app create --display-name "$SP_NAME" --output json | jq -r '.appId') az ad sp create --id "$APP_ID" --output none SECRET_JSON=$(az ad app credential reset --id "$APP_ID" --years 1 --output json) export AZURE_TENANT_ID=$(echo "$SECRET_JSON" | jq -r '.tenant') export AZURE_CLIENT_ID="$APP_ID" export AZURE_CLIENT_SECRET=$(echo "$SECRET_JSON" | jq -r '.password') Assign Event Hubs role: .. code-block:: bash NAMESPACE_ID=$(az eventhubs namespace show \ --name "$EVENTHUBS_NAMESPACE" \ --resource-group "$RESOURCE_GROUP" \ --output json | jq -r '.id') SP_OBJECT_ID=$(az ad sp show --id "$APP_ID" --output json | jq -r '.id') az role assignment create \ --assignee-object-id "$SP_OBJECT_ID" \ --assignee-principal-type ServicePrincipal \ --role "Azure Event Hubs Data Owner" \ --scope "$NAMESPACE_ID" For SAS auth, get the namespace connection string: .. code-block:: bash export SAS_CONNECTION_STRING=$(az eventhubs namespace authorization-rule keys list \ --resource-group "$RESOURCE_GROUP" \ --namespace-name "$EVENTHUBS_NAMESPACE" \ --name RootManageSharedAccessKey \ --output json | jq -r '.primaryConnectionString') Set bootstrap endpoint: .. code-block:: bash export EVENTHUBS_BOOTSTRAP="${EVENTHUBS_NAMESPACE}.servicebus.windows.net:9093" Run Waterstream with Event Hubs ------------------------------- All examples below use the public image: .. code-block:: bash export WS_IMAGE="waterstreamio/waterstream-kafka:|release|" .. note:: Starting from Waterstream 1.6.0, ``waterstreamio/waterstream-kafka`` is a multi-arch image manifest (amd64 + arm64). QoS 2 behavior on Event Hubs plans ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Waterstream uses Kafka transactions to provide the strongest QoS 2 guarantees. - With Premium or Dedicated plans, configure a unique stable ``KAFKA_TRANSACTIONAL_ID`` per Waterstream instance. - With Standard plan, keep ``KAFKA_TRANSACTIONAL_ID`` unset/empty and treat QoS 2 as non-transactional. .. note:: Per Microsoft Learn, Kafka transactions are currently listed as public preview on Event Hubs Premium and Dedicated. Example (transactional QoS 2 mode, Premium or Dedicated): .. code-block:: bash docker run -d --rm \ --name waterstream-kafka-qos2-tx \ -p 1883:1883 \ -e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \ -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \ -e KAFKA_SASL_MECHANISM=PLAIN \ -e KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\\$ConnectionString\" password=\"${SAS_CONNECTION_STRING}\";" \ -e KAFKA_TRANSACTIONAL_ID="ws-${HOSTNAME:-node}-01" \ -e KAFKA_ENABLE_IDEMPOTENCE=true \ -e MQTT_PORT=1883 \ -e KAFKA_SESSIONS_TOPIC=mqtt_sessions \ -e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \ -e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \ -e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \ -v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \ "$WS_IMAGE" 1) SASL/PLAIN (SAS connection string) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Use this mode when authenticating with Event Hubs SAS credentials. .. code-block:: bash docker run -d --rm \ --name waterstream-kafka-sasl \ -p 1883:1883 \ -e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \ -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \ -e KAFKA_SASL_MECHANISM=PLAIN \ -e KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\\$ConnectionString\" password=\"${SAS_CONNECTION_STRING}\";" \ -e MQTT_PORT=1883 \ -e KAFKA_SESSIONS_TOPIC=mqtt_sessions \ -e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \ -e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \ -e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \ -v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \ "$WS_IMAGE" 2) SASL/OAUTHBEARER (Entra ID client secret) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This mode uses the Waterstream Event Hubs callback handler: ``io.waterstream.kafka.auth.EventHubsCallbackHandler``. .. code-block:: bash docker run -d --rm \ --name waterstream-kafka-oauth-clientsecret \ -p 1883:1883 \ -e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \ -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \ -e KAFKA_SASL_MECHANISM=OAUTHBEARER \ -e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \ -e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="client-secret";' \ -e AZURE_TENANT_ID="$AZURE_TENANT_ID" \ -e AZURE_CLIENT_ID="$AZURE_CLIENT_ID" \ -e AZURE_CLIENT_SECRET="$AZURE_CLIENT_SECRET" \ -e MQTT_PORT=1883 \ -e KAFKA_SESSIONS_TOPIC=mqtt_sessions \ -e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \ -e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \ -e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \ -v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \ "$WS_IMAGE" 3) SASL/OAUTHBEARER (default credential chain) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This mode uses: .. code-block:: text azure.credential.type="default" The callback handler relies on Azure Identity ``DefaultAzureCredential``. .. code-block:: bash docker run -d --rm \ --name waterstream-kafka-oauth-default \ -p 1883:1883 \ -e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \ -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \ -e KAFKA_SASL_MECHANISM=OAUTHBEARER \ -e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \ -e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="default";' \ -e AZURE_TENANT_ID="${AZURE_TENANT_ID:-}" \ -e AZURE_CLIENT_ID="${AZURE_CLIENT_ID:-}" \ -e AZURE_CLIENT_SECRET="${AZURE_CLIENT_SECRET:-}" \ -e MQTT_PORT=1883 \ -e KAFKA_SESSIONS_TOPIC=mqtt_sessions \ -e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \ -e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \ -e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \ -v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \ "$WS_IMAGE" 4) SASL/OAUTHBEARER with managed identity (Azure VM) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For managed identity, run Waterstream on an Azure VM with system-assigned or user-assigned identity. Create a Linux VM with system-assigned managed identity: .. code-block:: bash export VM_NAME=waterstream-test-vm export VM_ADMIN_USERNAME=azureuser az vm create \ --resource-group "$RESOURCE_GROUP" \ --name "$VM_NAME" \ --location "$AZURE_REGION" \ --image Ubuntu2404 \ --size Standard_B2s \ --admin-username "$VM_ADMIN_USERNAME" \ --generate-ssh-keys \ --assign-identity \ --public-ip-sku Standard \ --nsg-rule SSH VM_PUBLIC_IP=$(az vm show -d \ --resource-group "$RESOURCE_GROUP" \ --name "$VM_NAME" \ --query publicIps -o tsv) Assign Event Hubs role to the VM system-assigned identity: .. code-block:: bash NAMESPACE_ID=$(az eventhubs namespace show \ --name "$EVENTHUBS_NAMESPACE" \ --resource-group "$RESOURCE_GROUP" \ --query id -o tsv) VM_SYSTEM_IDENTITY_PRINCIPAL_ID=$(az vm show \ --resource-group "$RESOURCE_GROUP" \ --name "$VM_NAME" \ --query identity.principalId -o tsv) az role assignment create \ --assignee-object-id "$VM_SYSTEM_IDENTITY_PRINCIPAL_ID" \ --assignee-principal-type ServicePrincipal \ --role "Azure Event Hubs Data Owner" \ --scope "$NAMESPACE_ID" Optional: create and attach user-assigned identity: .. code-block:: bash export VM_UAMI_NAME=waterstream-vm-uami az identity create \ --name "$VM_UAMI_NAME" \ --resource-group "$RESOURCE_GROUP" \ --location "$AZURE_REGION" VM_USER_ASSIGNED_IDENTITY_ID=$(az identity show \ --name "$VM_UAMI_NAME" \ --resource-group "$RESOURCE_GROUP" \ --query id -o tsv) VM_USER_ASSIGNED_IDENTITY_CLIENT_ID=$(az identity show \ --name "$VM_UAMI_NAME" \ --resource-group "$RESOURCE_GROUP" \ --query clientId -o tsv) VM_USER_ASSIGNED_IDENTITY_PRINCIPAL_ID=$(az identity show \ --name "$VM_UAMI_NAME" \ --resource-group "$RESOURCE_GROUP" \ --query principalId -o tsv) az vm identity assign \ --resource-group "$RESOURCE_GROUP" \ --name "$VM_NAME" \ --identities "$VM_USER_ASSIGNED_IDENTITY_ID" az role assignment create \ --assignee-object-id "$VM_USER_ASSIGNED_IDENTITY_PRINCIPAL_ID" \ --assignee-principal-type ServicePrincipal \ --role "Azure Event Hubs Data Owner" \ --scope "$NAMESPACE_ID" Install Docker on the VM, copy your license file, then SSH into the VM and run one of the following. System-assigned identity: .. code-block:: bash sudo docker run -d --rm \ --name waterstream-kafka-mi \ -p 1883:1883 \ -e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \ -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \ -e KAFKA_SASL_MECHANISM=OAUTHBEARER \ -e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \ -e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="managed-identity";' \ -e MQTT_PORT=1883 \ -e KAFKA_SESSIONS_TOPIC=mqtt_sessions \ -e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \ -e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \ -e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \ -v ~/waterstream.license:/etc/waterstream.license:ro \ "$WS_IMAGE" User-assigned identity (set client id in JAAS): .. code-block:: bash sudo docker run -d --rm \ --name waterstream-kafka-mi \ -p 1883:1883 \ -e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \ -e KAFKA_SECURITY_PROTOCOL=SASL_SSL \ -e KAFKA_SASL_MECHANISM=OAUTHBEARER \ -e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \ -e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="managed-identity" azure.managed.identity.client.id="";' \ -e MQTT_PORT=1883 \ -e KAFKA_SESSIONS_TOPIC=mqtt_sessions \ -e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \ -e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \ -e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \ -v ~/waterstream.license:/etc/waterstream.license:ro \ "$WS_IMAGE" Verify and smoke test --------------------- Check container logs: .. code-block:: bash docker logs -f waterstream-kafka-sasl docker logs -f waterstream-kafka-oauth-clientsecret docker logs -f waterstream-kafka-oauth-default For OAUTHBEARER modes, a successful callback handler initialization log contains: .. code-block:: text EventHubsCallbackHandler configured: credential=, scope=https://.servicebus.windows.net/.default Run MQTT smoke test: .. code-block:: bash mosquitto_sub -h localhost -p 1883 -t 'test/#' -v mosquitto_pub -h localhost -p 1883 -t test/hello -m world Teardown -------- Delete the resource group: .. code-block:: bash az group delete --name "$RESOURCE_GROUP" --yes --no-wait Optionally delete the Entra application registration if you created one for this quickstart: .. code-block:: bash az ad app delete --id "$AZURE_CLIENT_ID" Known limitations and operational notes --------------------------------------- For the full limitations matrix (impact, workaround, and tier mitigation), see :ref:`azure-event-hubs-limitations`. - Azure Event Hubs Kafka endpoint compatibility is not the same as a full Apache Kafka cluster. For Waterstream, create required Event Hubs explicitly (including ``waterstream-kafka-table-mqtt_sessions-changelog`` for default streams app/topic names). - Plan limitations: - Basic: unusable for Waterstream because Kafka protocol endpoint is not available. - Standard: Kafka connectivity works, but Kafka transactions required for strongest QoS 2 semantics are not available. - Premium and Dedicated: required when QoS 2 must run with Kafka transaction-backed guarantees (currently documented as public preview). - Waterstream QoS 2 is transaction-dependent for strongest guarantees. If ``KAFKA_TRANSACTIONAL_ID`` is empty or unset, transactions are disabled and QoS 2 consistency is weaker. If set, it must be unique and stable per Waterstream instance. - OAUTHBEARER support for Event Hubs in Waterstream requires: - ``KAFKA_SASL_MECHANISM=OAUTHBEARER`` - ``KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler`` - a JAAS string including ``azure.credential.type`` - For ``azure.credential.type="client-secret"``, all these environment variables must be non-empty: - ``AZURE_TENANT_ID`` - ``AZURE_CLIENT_ID`` - ``AZURE_CLIENT_SECRET`` - For ``azure.credential.type="managed-identity"``, Waterstream must run in an Azure environment where managed identity metadata is reachable. This mode does not work on a regular local machine. - Waterstream derives the Entra scope from the first bootstrap server value. If multiple bootstrap entries are set, only the first entry is used for scope derivation. - After role assignments (service principal or managed identity), Azure RBAC propagation may take time. Short-lived authentication failures can occur immediately after provisioning.