Skip to main content
Version: 2.12

Batch Processor

The batch processor can be used to aggregate entries(logs/any data) and process them in a batch. When the batch_max_size is set to zero the processor will execute each entry immediately. Setting the batch max size more than 1 will start aggregating the entries until it reaches the max size or the timeout expires.

Configurations#

The only mandatory parameter to create a batch processor is a function. The function will be executed when the batch reaches the max size or when the buffer duration exceeds.

NameRequirementDescription
nameoptionalA unique identifier to identity the batch processor
batch_max_sizeoptionalMax size of each batch, default is 1000
inactive_timeoutoptionalmaximum age in seconds when the buffer will be flushed if inactive, default is 5s
buffer_durationoptionalMaximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 5
max_retry_countoptionalMaximum number of retries before removing from the processing pipe line; default is zero
retry_delayoptionalNumber of seconds the process execution should be delayed if the execution fails; default is 1

The following code shows an example of how to use batch processor in your plugin:

local bp_manager_mod = require("apisix.utils.batch-processor-manager")...
local plugin_name = "xxx-logger"local batch_processor_manager = bp_manager_mod.new(plugin_name)local schema = {...}local _M = {    ...    name = plugin_name,    schema = batch_processor_manager:wrap_schema(schema),}
...

function _M.log(conf, ctx)    local entry = {...} -- data to log
    if batch_processor_manager:add_entry(conf, entry) then        return    end    -- create a new processor if not found
    -- entries is an array table of entry, which can be processed in batch    local func = function(entries)        -- serialize to json array core.json.encode(entries)        -- process/send data        return true        -- return false, err_msg if failed    end    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)end

The batch processor's configuration will be set inside the plugin's configuration. For example:

curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '{      "plugins": {            "http-logger": {                "uri": "http://mockbin.org/bin/:ID",                "batch_max_size": 10,                "max_retry_count": 1            }       },      "upstream": {           "type": "roundrobin",           "nodes": {               "127.0.0.1:1980": 1           }      },      "uri": "/hello"}'

If your plugin only uses one global batch processor, you can also use the processor directly:

local entry = {...} -- data to logif log_buffer then    log_buffer:push(entry)    returnend
local config_bat = {    name = config.name,    retry_delay = config.retry_delay,    ...}
local err-- entries is an array table of entry, which can be processed in batchlocal func = function(entries)    ...    return true    -- return false, err_msg if failedendlog_buffer, err = batch_processor:new(func, config_bat)
if not log_buffer then    core.log.warn("error when creating the batch processor: ", err)    returnend
log_buffer:push(entry)

Note: Please make sure the batch max size (entry count) is within the limits of the function execution. The timer to flush the batch runs based on the inactive_timeout configuration. Thus, for optimal usage, keep the inactive_timeout smaller than the buffer_duration.