Consumer
The Fluvio Consumer is the component that reads and processes records from topics within the Fluvio streaming platform. It is designed to be flexible and robust, enabling applications to ingest data in real time or in batch mode with fine-grained control over partition selection, output formatting, and offset management.
Capabilities
-
Record Consumption:
The Consumer retrieves records from a specified topic and partition. By default, it reads from partition0
but can be directed to any partition or even all partitions simultaneously. -
Flexible Reading Modes:
- From the Beginning or Latest: Consumers can start reading at the beginning of the log (using flags such as
--from-beginning
or-B
) or from the current offset. - Batch vs. Continuous Consumption: The
--disable-continuous
(-d
) flag allows the consumer to exit after reading all available records, making it ideal for batch processing.
- From the Beginning or Latest: Consumers can start reading at the beginning of the log (using flags such as
-
SmartModule Integration:
Similar to producers, consumers can integrate SmartModules (WASM modules) to perform inline transformations or filtering on records before they are delivered to the application. -
Custom Output Formatting:
With the--format
flag, consumers can tailor the output display by embedding placeholders (e.g.,{{key}}
,{{value}}
,{{partition}}
,{{offset}}
, and{{time}}
) in a format string to suit their logging or processing needs. -
Offset Management:
Consumers maintain a persistent pointer for each topic partition. This offset ensures that upon restart or recovery, the consumer can resume reading from the correct position in the log.
Partition Consumption Strategies
Single Partition Consumption
By default, when you run a command like:
$ fluvio consume my-topic -B -d
the consumer reads from partition 0
. This is suitable for topics where records are funneled into a single partition or when targeted consumption is required.
Specifying a Partition
To target a specific partition, use the --partition
(-p
) flag:
$ fluvio consume my-topic -B --partition 1
This command directs the consumer to read from partition 1
.
Consuming from All Partitions
For cases where you want to aggregate records from every partition, the -A
flag allows the consumer to consume records across all partitions:
$ fluvio consume my-topic -B -A
Note: When consuming from multiple partitions, there is no guarantee of record ordering between partitions.
SmartModule Integration for Consumers
Fluvio SmartModules provide an additional layer of processing by allowing you to filter or transform records in-flight. For example, you might use a SmartModule to filter out records that do not meet certain criteria:
$ fluvio consume my-topic -B --smartmodule-path="fluvio_wasm_filter.wasm"
Alternatively, after registering a SmartModule with the cluster:
$ fluvio smartmodule create --wasm-file="fluvio_wasm_filter.wasm" my_filter
you can apply it by name:
$ fluvio consume my-topic -B --smartmodule="my_filter"
This integration allows for powerful, on-the-fly processing without altering the core consumer logic.
Custom Output Formatting
The consumer’s default output prints only the record values. For richer debugging or logging, the --key-value
flag displays both keys and values:
$ fluvio consume my-topic -B --key-value
[null] This is my first record ever
[alice] Alice In Wonderland
Moreover, you can define a custom format using the --format
option. For instance, to print records as CSV rows:
$ fluvio consume my-topic -B --format="{{time}},{{partition}},{{offset}},{{key}},{{value}}"
2022-05-04T15:35:49.244Z,0,0,null,This is my first record ever
2022-05-04T15:35:49.244Z,0,1,null,This is my second record ever
2022-05-04T15:52:19.963Z,0,2,alice,Alice In Wonderland
This flexibility in output formatting empowers users to integrate Fluvio into various monitoring, debugging, and processing workflows.
Consumer Offsets and Commit Strategies
A key aspect of any streaming system is the management of consumer offsets. In Fluvio, these offsets serve as durable bookmarks that track which records have been processed:
-
Persistence and Durability:
Offsets are stored persistently per topic partition. They survive cluster restarts and upgrades, ensuring that consumers can reliably resume consumption. -
Commit Strategies:
Fluvio supports two primary offset management strategies:Manual Offset Management:
The application explicitly commits offsets after processing records. This provides full control over when an offset is considered “committed.”Example in Rust:
use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};
use futures_util::StreamExt;
async fn manual_consume(fluvio: &Fluvio) -> anyhow::Result<()> {
let mut stream = fluvio
.consumer_with_config(
ConsumerConfigExtBuilder::default()
.topic("my-topic".to_string())
.offset_consumer("my-consumer".to_string())
.offset_start(Offset::beginning())
.offset_strategy(OffsetManagementStrategy::Manual)
.build()?,
)
.await?;
while let Some(Ok(record)) = stream.next().await {
println!("{}", String::from_utf8_lossy(record.as_ref()));
stream.offset_commit()?;
stream.offset_flush().await?;
}
Ok(())
}-
Auto Offset Management:
Offsets are committed automatically as records are consumed, reducing the burden on the developer.Example in Rust:
use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};
use futures_util::StreamExt;
async fn auto_consume(fluvio: &Fluvio) -> anyhow::Result<()> {
let mut stream = fluvio
.consumer_with_config(
ConsumerConfigExtBuilder::default()
.topic("my-topic".to_string())
.offset_consumer("my-consumer".to_string())
.offset_start(Offset::beginning())
.offset_strategy(OffsetManagementStrategy::Auto)
.build()?,
)
.await?;
while let Some(Ok(record)) = stream.next().await {
println!("{}", String::from_utf8_lossy(record.as_ref()));
}
Ok(())
}
-
-
Consumer Identity and Listing:
Each consumer is identified by a unique name (specified via the CLI with-c
or in code). You can list and manage consumers using the Fluvio CLI:$ fluvio consumer list
CONSUMER TOPIC PARTITION OFFSET LAST SEEN
c1 hello-topic 0 1 4m 14sConsumers can also be deleted as needed:
$ fluvio consumer delete c1
This robust offset management system ensures that applications can reliably process streams of data without loss or duplication, even in the face of failures.
Summary
The Fluvio Consumer abstracts the complexities of reading and processing streaming data with features that include:
-
Flexible Partition Consumption:
Read from single or multiple partitions with configurable starting offsets. -
Dynamic Data Transformation:
Integrate SmartModules for real-time filtering or transformation of records. -
Customizable Output:
Format output to suit application needs using the--key-value
and--format
options. -
Robust Offset Management:
Choose between manual and automatic strategies to maintain accurate, persistent consumption state.
By leveraging these capabilities, the Fluvio Consumer provides a powerful, scalable way to build reactive applications that respond to real-time data streams.
For further details and practical examples, please check out the consumer in the CLI reference.