Waterstream Features
====================

Waterstream offers a few features beyond the standard MQTT specification requirements.

Some of them are experimental, which means that we're still collecting feedback on them
and future development may break compatibility.
Give us your feedback on such features on the support forum to set their future: https://discuss.waterstream.io/

(Experimental) Start subscription from the specified timestamp
--------------------------------------------------------------

From Waterstream version `1.4.10`. Requires MQTT v.5

Typically, when client subscribes to MQTT topic it doesn't receive the messages that were
published before (except of one retained message per topic).
If you want to read the older messages, you may specify user property ``kafkaTopicStartTimestamp``
which contains comma-separated list of pairs: ``kafkaTopic: startTimestamp``,
where ``startTimestamp`` is a Unix timestamp in milliseconds (i.e. milliseconds from the start of the epoque).

For example, using Mosquitto client you could do something like this:

.. code-block:: bash

    #Get timestamp in milliseconds
    $ date +%s"000"
    1668412470000
    #Send some messages before subscribing
    $ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub1"
    $ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub2"
    #Subscribe with the custom start timestamp
    $ mosquitto_sub -h localhost -p 1883 -t "#" -V 5 -D SUBSCRIBE user-property kafkaTopicStartTimestamp 'mqtt_messages:1668412470000' -v
    foo before sub1
    foo before sub2

Environment variable ``REWIND_MAX_DEPTH_SECONDS`` limits the oldest point in time to which MQTT clients can rewind,
in order to avoid excessive consumption by misbehaving clients. The default is `5184000` (60 days).

Limitations:
- if this client is already reading from the specified Kafka topic
(that is, the client has subscribed to the MQTT topic mapped to the Kafka topic)
then such parameter has no effect - new MQTT topic subscriptions will start from now.
Otherwise, it would be hard to avoid the duplicate messages delivery.


.. _validate-messages:

(Experimental) Validate messages with the Confluent Schema Registry
-------------------------------------------------------------------

Starting from version ``1.4.19`` Waterstream can validate JSON messages
using the Avro schema retrieved from the `Confluent Schema Registry <https://docs.confluent.io/platform/current/schema-registry/index.html>`_ using the
following configuration parameters:

- ``VALIDATION_MQTT_TOPIC_SCHEMAS`` - mapping between the MQTT topics and validation schemas.
  Currently only supports retrieving the Avro schema from the Confluent Schema Registry by subject name.
  MQTT topic pattern may include the `+` and `#` wildcards - single-level and multi-level, respectively.
  Example value: ``[{"mqttPattern": "foo", "subject": "foo-value"}, {"mqttPattern": "bar/#", "subject": "bar-value"}]``
- ``VALIDATION_SCHEMA_REGISTRY_URL`` - URL of the Schema Registry server
- ``VALIDATION_SCHEMA_REGISTRY_BASIC_USERNAME``, ``VALIDATION_SCHEMA_REGISTRY_BASIC_PASSWORD`` - credentials for
  basic authentication with the Schema Registry. The default is empty - no authentication.
- ``VALIDATION_LATEST_SCHEMA_TTL_SECONDS`` - Interval between latest schema data refresh, seconds. The default is 60 seconds.

At the moment, validation is only possible with the schema retrieved from the Schema Registry using the
subject name. Waterstream refreshes schemas each ``VALIDATION_LATEST_SCHEMA_TTL_SECONDS`` seconds to check if a new
version exists. The last fetched schema is used for the validation.
If schema with the specified subject is not available, then all the messages in the corresponding MQTT topics
are considered invalid. If none of the patterns specified in ``mqttPattern`` matches the MQTT topic, then
Waterstream skips the validation and message is considered valid.

|

Validation expects the MQTT message body and Kafka message body to be in the JSON format. At the moment, it does not do any
conversion. In particular, it does not add a magic byte and schema ID as the Avro serializers/deserializers for Kafka do.

|

Waterstream deals with invalid messages depending on the MQTT protocol version.
If an MQTT 3.x client publishes a message with an invalid body, it gets disconnected as there is no other error reporting mechanism in this version of MQTT.
Differently, MQTT 5 clients keep the connection even after the invalid message, as this version of MQTT has ability to report an error and gracefully reject a message.
If a client sends an invalid message with QoS 1 or 2 then it gets ``PUBACK`` or ``PUBREC`` respectively with an
error code ``0x83`` (implementation specific error).

|

The following is an example how the user can add validation for the particular MQTT topic:

#. Upload a schema to the Schema Registry:

.. code-block:: bash

    FOOBAR_SCHEMA='{"namespace": "io.waterstream.test",
                               "type": "record",
                               "name": "FooBarRecord",
                               "fields": [
                                   {"name": "count", "type": "int"},
                                   {"name": "message", "type": "string"}
                               ]
                           }'
    ESCAPED_SCHEMA=`echo ${FOOBAR_SCHEMA} | jq -RsaM`
    SCHEMA_WRAPPER="{\"schema\": ${ESCAPED_SCHEMA}}"

    curl -v -X POST <your_schema_registry_server>/subjects/${SUBJECT}/versions -H "Content-Type: application/json" -d "${SCHEMA_WRAPPER}"


2. Configure the Waterstream:

  - specify ``VALIDATION_SCHEMA_REGISTRY_URL`` to point to the schema registry server,
  provide credentials if needed,

  - map the MQTT topics to the validation schemas: ``VALIDATION_MQTT_TOPIC_SCHEMAS='[{"mqttPattern": "foo/bar", "subject": "foobar-value"}]'``,

  - re-start Waterstream.

3. Check that validation works, send sample messages to the ``foo/bar`` topic and check if they are being delivered.
  Such message should be delivered: ``{"count": 10, "message": "hello, validationn"}``, and such - rejected:
  ``{"message": "bye, validation"}``.