Batch Processor
The batch processor can be used to aggregate entries(logs/any data) and process them in a batch. When the batch_max_size is set to zero the processor will execute each entry immediately. Setting the batch max size more than 1 will start aggregating the entries until it reaches the max size or the timeout expires.
#
ConfigurationsThe only mandatory parameter to create a batch processor is a function. The function will be executed when the batch reaches the max size or when the buffer duration exceeds.
Name | Type | Requirement | Default | Valid | Description |
---|---|---|---|---|---|
name | string | optional | logger's name | ["http logger",...] | A unique identifier used to identify the batch processor, which defaults to the name of the logger plug-in that calls the batch processor, such as plug-in "http logger" 's name is "http logger. |
batch_max_size | integer | optional | 1000 | [1,...] | Sets the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the HTTP/HTTPS service. |
inactive_timeout | integer | optional | 5 | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the HTTP/HTTPS service regardless of whether the number of logs in the buffer reaches the maximum number set. |
buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed. |
max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing the entry from the processing pipeline when an error occurs. |
retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
The following code shows an example of how to use batch processor in your plugin:
local bp_manager_mod = require("apisix.utils.batch-processor-manager")...
local plugin_name = "xxx-logger"local batch_processor_manager = bp_manager_mod.new(plugin_name)local schema = {...}local _M = { ... name = plugin_name, schema = batch_processor_manager:wrap_schema(schema),}
...
function _M.log(conf, ctx) local entry = {...} -- data to log
if batch_processor_manager:add_entry(conf, entry) then return end -- create a new processor if not found
-- entries is an array table of entry, which can be processed in batch local func = function(entries) -- serialize to json array core.json.encode(entries) -- process/send data return true -- return false, err_msg, first_fail if failed -- first_fail(optional) indicates first_fail-1 entries have been successfully processed -- and during processing of entries[first_fail], the error occurred. So the batch processor -- only retries for the entries having index >= first_fail as per the retry policy. end batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)end
The batch processor's configuration will be set inside the plugin's configuration. For example:
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '{ "plugins": { "http-logger": { "uri": "http://mockbin.org/bin/:ID", "batch_max_size": 10, "max_retry_count": 1 } }, "upstream": { "type": "roundrobin", "nodes": { "127.0.0.1:1980": 1 } }, "uri": "/hello"}'
If your plugin only uses one global batch processor, you can also use the processor directly:
local entry = {...} -- data to logif log_buffer then log_buffer:push(entry) returnend
local config_bat = { name = config.name, retry_delay = config.retry_delay, ...}
local err-- entries is an array table of entry, which can be processed in batchlocal func = function(entries) ... return true -- return false, err_msg, first_fail if failedendlog_buffer, err = batch_processor:new(func, config_bat)
if not log_buffer then core.log.warn("error when creating the batch processor: ", err) returnend
log_buffer:push(entry)
Note: Please make sure the batch max size (entry count) is within the limits of the function execution.
The timer to flush the batch runs based on the inactive_timeout
configuration. Thus, for optimal usage,
keep the inactive_timeout
smaller than the buffer_duration
.