Summary -

In this topic, we described about the below sections -

The Sink Processor is another important component of Apache Flume. Normally sink processor is a sink group.

Sink Processors offer -

  • Load balancing capability over sinks of a group.
  • Failover capability by maintaining a ordered list of sinks within group.

Sink groups allow users to group multiple sinks into one entity. Sink Processor is the mechanism through which we can create failover paths. Configuring the load balance can be done using the sink processor so that the load can be distributed to multiple sinks.

There are three types of sink processors available and those are -

  1. Default sink processor
  2. Failover sink processor
  3. Load balancing sink processor

Let us discuss in detail about the each type of sink processor.

Default sink processor -

If no sink processor specifies, default sink processor will be applied. Default sink processor accepts only a single sink. It is neither required nor forced to create a processor for single sinks.

Source-channel-sink pattern was followed.

<agent_name>.sinkgroups = <sink_group>
<agent_name>.sinkgroups.<sink_group>.sinks 
			= <sink1> <sink2>
<agent_name>.sinkgroups.<sink_group>.processor.type 
			= default

Example for agent named agt for the load balancing sink processor type -

agt.sinkgroups = sg
agt.sinkgroups.sg.sinks = sn1 sn2
agt.sinkgroups.sg.processor.type = default

Below are the lists of properties that required for default sink processor definition.

Property NameDefaultDescription
sinksSpace-separated list of sinks that are participating in the group
processor.typedefaultThe component type name, needs to be default, failover or load_balance

Failover sink processor -

Failover sink processor works by maintaining the priority list of the sinks. Failover sink processor can guarantee that the long event can also be delivered. Failover sink processor has the updated priority list of sinks.

If any sinks failed, then the sink will send to pool where the sinks assigned a cool down period. If the same sink increases the sequential failures then the sinks will be retired.

If any sinks successfully sends an event, the sinks is restored to live pool. Every sinks has its associated priority with them. The larger number is higher in priority.

If any sinks fail to send the event, the next available high priority sink assigned and will try to send the events. If the priority specified, then the priority should be unique. If the priority not specified, then the priority assigned based on the order of sink from the configuration.

<agent_name>.sinkgroups = <sink_group>
<agent_name>.sinkgroups.<sink_group>.sinks 
			= <sink1> <sink2>
<agent_name>.sinkgroups.<sink_group>.processor.type = failover
<agent_name>.sinkgroups.<sink_group>.processor.priority.<sink1> 
			= sink1_priority_nbr
<agent_name>.sinkgroups.<sink_group>.processor.priority.<sink2> 
			= sink2_priority_nbr
<agent_name>.sinkgroups.<sink_group>.processor.maxpenalty 
			= max-penalty-nbr

Example for agent named agt -

agt.sinkgroups = sg
agt.sinkgroups.sg.sinks = sn1 sn2
agt.sinkgroups.sg.processor.type = failover
agt.sinkgroups.sg.processor.priority.sn1 = 15
agt.sinkgroups.sg.processor.priority.sn2 = 20
agt.sinkgroups.sg.processor.maxpenalty = 20000

Below are the lists of properties that required for failover sink processor definition.

Property NameDefaultDescription
sinksSpace-separated list of sinks
processor.typedefaultneeds to be failover
processor.priority.<sinkName>Priority value. <sinkName> must be one of the sink instances associated with the current sink group.
processor.maxpenalty30000The maximum backoff period for the failed Sink (in millis).

Load balancing sink processor -

Load balancing sink processor provides the capability to load-balance flow over multiple sinks. Load balancing sink processor maintains an indexed list of active sinks. Based on the indexed list of active sinks, the load must be equally distributed.

There are two selection mechanisms used to distribute the load.

  1. Round_robin
  2. Random

The round_robin mechanism is the default if nothing specified. The mechanism can be overridden by the configuration. If any sink fails to send the event, then the processor tries to pick the next available sink to transfer the event.

In this case, the failed sink will not be disabled and processor tries to balance the load over the sinks available. If backoff is enabled, the sink processor will blacklist sinks that fail, removing them for selection for a given timeout.

<agent_name>.sinkgroups = <sink_group>
<agent_name>.sinkgroups.<sink_group>.sinks = <sink1> <sink2>
<agent_name>.sinkgroups.<sink_group>.processor.type = load_balance
<agent_name>.sinkgroups.<sink_group>.processor.backoff = true/false
<agent_name>.sinkgroups.<sink_group>.selector = random/round_robin 

Example for agent named agt -

agt.sinkgroups = sg
agt.sinkgroups.sg.sinks = sn1 sn2
agt.sinkgroups.sg.processor.type = load_balance
agt.sinkgroups.sg.processor.backoff = true
agt.sinkgroups.sg.processor.selector = random

Below are the lists of properties that required for load balancing sink processor definition.

Property NameDefaultDescription
processor.sinksSpace-separated list of sinks
processor.typedefaultneeds to be load_balance
processor.backofffalseShould failed sinks be backed off
processor.selectorround_robinSelection mechanism. Must be either round_robin, random
processor.selector.maxTimeOut30000exponential backoff limit (in milliseconds)