Apache Flume Sink Processors
The Sink Processor is an important component of Apache Flume. Typically, a sink processor is used as a sink group. Sink processors provide several key features:
- Load balancing capabilities across the sinks in a group.
- Failover capabilities by maintaining an ordered list of sinks within the group.
Sink groups allow users to consolidate multiple sinks into a single entity. The sink processor is the mechanism through which failover paths can be created. Additionally, configuring load balancing can be achieved using the sink processor, enabling the distribution of load across multiple sinks.
There are three types of sink processors available:
- Default sink processor
- Failover sink processor
- Load balancing sink processor
Let us discuss each type of sink processor in detail.
Default sink processor -
If there isn't a specific sink processor set up, the default one will kick in. This default processor only works with one sink at a time. You don't have to create a processor for single sinks if you don't want to.
The setup follows a source-channel-sink pattern.
<agent_name>.sinkgroups = <sink_group>
<agent_name>.sinkgroups.<sink_group>.sinks
= <sink1> <sink2>
<agent_name>.sinkgroups.<sink_group>.processor.type
= default
Example for agent named agt for the load balancing sink processor type -
agt.sinkgroups = sg
agt.sinkgroups.sg.sinks = sn1 sn2
agt.sinkgroups.sg.processor.type = default
Below are the lists of properties that required for default sink processor definition.
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be default, failover or load_balance |
Failover sink processor -
The failover sink processor operates by maintaining a priority list of sinks. It ensures that long events can still be delivered effectively. The processor keeps an updated priority list for these sinks.
When a sink fails, it is sent to a pool where it is assigned a cooldown period. If the same sink experiences multiple consecutive failures, it will be retired from use. Conversely, if a sink successfully sends an event, it is restored to the active pool.
Each sink has an associated priority, with higher numerical values indicating higher priority levels. If a sink fails to send an event, the next available sink with the highest priority will be assigned to attempt sending the event. If a priority is specified, it must be unique. If no priority is indicated, sinks are assigned priorities based on their order in the configuration.
<agent_name>.sinkgroups = <sink_group>
<agent_name>.sinkgroups.<sink_group>.sinks
= <sink1> <sink2>
<agent_name>.sinkgroups.<sink_group>.processor.type = failover
<agent_name>.sinkgroups.<sink_group>.processor.priority.<sink1>
= sink1_priority_nbr
<agent_name>.sinkgroups.<sink_group>.processor.priority.<sink2>
= sink2_priority_nbr
<agent_name>.sinkgroups.<sink_group>.processor.maxpenalty
= max-penalty-nbr
Example for agent named agt -
agt.sinkgroups = sg
agt.sinkgroups.sg.sinks = sn1 sn2
agt.sinkgroups.sg.processor.type = failover
agt.sinkgroups.sg.processor.priority.sn1 = 15
agt.sinkgroups.sg.processor.priority.sn2 = 20
agt.sinkgroups.sg.processor.maxpenalty = 20000
Below are the lists of properties that required for failover sink processor definition.
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks |
processor.type | default | needs to be failover |
processor.priority.<sinkName> | – | Priority value. <sinkName> must be one of the sink instances associated with the current sink group. |
processor.maxpenalty | 30000 | The maximum backoff period for the failed Sink (in millis). |
Load balancing sink processor -
The load balancing sink processor is designed to distribute flow across multiple sinks efficiently. It maintains an indexed list of active sinks to ensure that the load is evenly distributed among them.
There are two mechanisms available for load distribution:
- Round_robin
- Random
The round robin mechanism is used by default unless a different configuration is specified. If a sink fails to transmit an event, the processor automatically selects the next available sink to transfer the event. In this case, the failed sink remains active, and the processor continues to balance the load across the available sinks.
If backoff is enabled, the sink processor will temporarily blacklist any sinks that fail. These blacklisted sinks will be excluded from selection for a specified timeout period.
<agent_name>.sinkgroups = <sink_group>
<agent_name>.sinkgroups.<sink_group>.sinks = <sink1> <sink2>
<agent_name>.sinkgroups.<sink_group>.processor.type = load_balance
<agent_name>.sinkgroups.<sink_group>.processor.backoff = true/false
<agent_name>.sinkgroups.<sink_group>.selector = random/round_robin
Example for agent named agt -
agt.sinkgroups = sg
agt.sinkgroups.sg.sinks = sn1 sn2
agt.sinkgroups.sg.processor.type = load_balance
agt.sinkgroups.sg.processor.backoff = true
agt.sinkgroups.sg.processor.selector = random
Below are the lists of properties that required for load balancing sink processor definition.
Property Name | Default | Description |
---|---|---|
processor.sinks | – | Space-separated list of sinks |
processor.type | default | needs to be load_balance |
processor.backoff | false | Should failed sinks be backed off |
processor.selector | round_robin | Selection mechanism. Must be either round_robin, random |
processor.selector.maxTimeOut | 30000 | exponential backoff limit (in milliseconds) |