Skip to main content
Version: latest

Consumer

The Fluvio Consumer is the component that reads and processes records from topics within the Fluvio streaming platform. It is designed to be flexible and robust, enabling applications to ingest data in real time or in batch mode with fine-grained control over partition selection, output formatting, and offset management.


Capabilities

  • Record Consumption:
    The Consumer retrieves records from a specified topic and partition. By default, it reads from partition 0 but can be directed to any partition or even all partitions simultaneously.

  • Flexible Reading Modes:

    • From the Beginning or Latest: Consumers can start reading at the beginning of the log (using flags such as --from-beginning or -B) or from the current offset.
    • Batch vs. Continuous Consumption: The --disable-continuous (-d) flag allows the consumer to exit after reading all available records, making it ideal for batch processing.
  • SmartModule Integration:
    Similar to producers, consumers can integrate SmartModules (WASM modules) to perform inline transformations or filtering on records before they are delivered to the application.

  • Custom Output Formatting:
    With the --format flag, consumers can tailor the output display by embedding placeholders (e.g., {{key}}, {{value}}, {{partition}}, {{offset}}, and {{time}}) in a format string to suit their logging or processing needs.

  • Offset Management:
    Consumers maintain a persistent pointer for each topic partition. This offset ensures that upon restart or recovery, the consumer can resume reading from the correct position in the log.


Partition Consumption Strategies

Single Partition Consumption

By default, when you run a command like:

$ fluvio consume my-topic -B -d

the consumer reads from partition 0. This is suitable for topics where records are funneled into a single partition or when targeted consumption is required.

Specifying a Partition

To target a specific partition, use the --partition (-p) flag:

$ fluvio consume my-topic -B --partition 1

This command directs the consumer to read from partition 1.

Consuming from All Partitions

For cases where you want to aggregate records from every partition, the -A flag allows the consumer to consume records across all partitions:

$ fluvio consume my-topic -B -A

Note: When consuming from multiple partitions, there is no guarantee of record ordering between partitions.


SmartModule Integration for Consumers

Fluvio SmartModules provide an additional layer of processing by allowing you to filter or transform records in-flight. For example, you might use a SmartModule to filter out records that do not meet certain criteria:

$ fluvio consume my-topic -B --smartmodule-path="fluvio_wasm_filter.wasm"

Alternatively, after registering a SmartModule with the cluster:

$ fluvio smartmodule create --wasm-file="fluvio_wasm_filter.wasm" my_filter

you can apply it by name:

$ fluvio consume my-topic -B --smartmodule="my_filter"

This integration allows for powerful, on-the-fly processing without altering the core consumer logic.


Custom Output Formatting

The consumer’s default output prints only the record values. For richer debugging or logging, the --key-value flag displays both keys and values:

$ fluvio consume my-topic -B --key-value
[null] This is my first record ever
[alice] Alice In Wonderland

Moreover, you can define a custom format using the --format option. For instance, to print records as CSV rows:

$ fluvio consume my-topic -B --format="{{time}},{{partition}},{{offset}},{{key}},{{value}}"
2022-05-04T15:35:49.244Z,0,0,null,This is my first record ever
2022-05-04T15:35:49.244Z,0,1,null,This is my second record ever
2022-05-04T15:52:19.963Z,0,2,alice,Alice In Wonderland

This flexibility in output formatting empowers users to integrate Fluvio into various monitoring, debugging, and processing workflows.


Consumer Offsets and Commit Strategies

A key aspect of any streaming system is the management of consumer offsets. In Fluvio, these offsets serve as durable bookmarks that track which records have been processed:

  • Persistence and Durability:
    Offsets are stored persistently per topic partition. They survive cluster restarts and upgrades, ensuring that consumers can reliably resume consumption.

  • Commit Strategies:
    Fluvio supports two primary offset management strategies:

    Manual Offset Management:
    The application explicitly commits offsets after processing records. This provides full control over when an offset is considered “committed.”

    Example in Rust:

    use fluvio::{
    consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
    Fluvio, Offset,
    };
    use futures_util::StreamExt;

    async fn manual_consume(fluvio: &Fluvio) -> anyhow::Result<()> {
    let mut stream = fluvio
    .consumer_with_config(
    ConsumerConfigExtBuilder::default()
    .topic("my-topic".to_string())
    .offset_consumer("my-consumer".to_string())
    .offset_start(Offset::beginning())
    .offset_strategy(OffsetManagementStrategy::Manual)
    .build()?,
    )
    .await?;

    while let Some(Ok(record)) = stream.next().await {
    println!("{}", String::from_utf8_lossy(record.as_ref()));
    stream.offset_commit()?;
    stream.offset_flush().await?;
    }
    Ok(())
    }
    1. Auto Offset Management:
      Offsets are committed automatically as records are consumed, reducing the burden on the developer.

      Example in Rust:

      use fluvio::{
      consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
      Fluvio, Offset,
      };
      use futures_util::StreamExt;

      async fn auto_consume(fluvio: &Fluvio) -> anyhow::Result<()> {
      let mut stream = fluvio
      .consumer_with_config(
      ConsumerConfigExtBuilder::default()
      .topic("my-topic".to_string())
      .offset_consumer("my-consumer".to_string())
      .offset_start(Offset::beginning())
      .offset_strategy(OffsetManagementStrategy::Auto)
      .build()?,
      )
      .await?;

      while let Some(Ok(record)) = stream.next().await {
      println!("{}", String::from_utf8_lossy(record.as_ref()));
      }
      Ok(())
      }
  • Consumer Identity and Listing:
    Each consumer is identified by a unique name (specified via the CLI with -c or in code). You can list and manage consumers using the Fluvio CLI:

    $ fluvio consumer list
    CONSUMER TOPIC PARTITION OFFSET LAST SEEN
    c1 hello-topic 0 1 4m 14s

    Consumers can also be deleted as needed:

    $ fluvio consumer delete c1

This robust offset management system ensures that applications can reliably process streams of data without loss or duplication, even in the face of failures.


Summary

The Fluvio Consumer abstracts the complexities of reading and processing streaming data with features that include:

  • Flexible Partition Consumption:
    Read from single or multiple partitions with configurable starting offsets.

  • Dynamic Data Transformation:
    Integrate SmartModules for real-time filtering or transformation of records.

  • Customizable Output:
    Format output to suit application needs using the --key-value and --format options.

  • Robust Offset Management:
    Choose between manual and automatic strategies to maintain accurate, persistent consumption state.

By leveraging these capabilities, the Fluvio Consumer provides a powerful, scalable way to build reactive applications that respond to real-time data streams.

For further details and practical examples, please check out the consumer in the CLI reference.