diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 6a83075cc5..852f2723bd 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -36,7 +36,7 @@ class TailInput < Fluent::Plugin::Input helpers :timer, :event_loop, :parser, :compat_parameters RESERVED_CHARS = ['/', '*', '%'].freeze - MetricsInfo = Struct.new(:opened, :closed, :rotated) + MetricsInfo = Struct.new(:opened, :closed, :rotated, :throttled) class WatcherSetupError < StandardError def initialize(msg) @@ -208,7 +208,8 @@ def configure(conf) opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files") closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files") rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files") - @metrics = MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) + throttling_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_throttled_total", help_text: "Total number of times throttling occurs per file when throttling enabled") + @metrics = MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics, throttling_metrics) end def configure_tag @@ -793,6 +794,7 @@ def statistics 'opened_file_count' => @metrics.opened.get, 'closed_file_count' => @metrics.closed.get, 'rotated_file_count' => @metrics.rotated.get, + 'throttled_log_count' => @metrics.throttled.get, }) } stats @@ -1192,8 +1194,10 @@ def should_shutdown_now? end def handle_notify - return if limit_bytes_per_second_reached? - return if group_watcher&.limit_lines_reached?(@path) + if limit_bytes_per_second_reached? || group_watcher&.limit_lines_reached?(@path) + @metrics.throttled.inc + return + end with_io do |io| begin @@ -1220,6 +1224,7 @@ def handle_notify if group_watcher_limit || limit_bytes_per_second_reached? || should_shutdown_now? # Just get out from tailing loop. + @metrics.throttled.inc if group_watcher_limit || limit_bytes_per_second_reached? read_more = false break end diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index f305fdbdc3..5dd37dc4b8 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -15,7 +15,9 @@ def setup closed_file_metrics.configure(config_element('metrics', '', {})) rotated_file_metrics = Fluent::Plugin::LocalMetrics.new rotated_file_metrics.configure(config_element('metrics', '', {})) - @metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) + throttling_metrics = Fluent::Plugin::LocalMetrics.new + throttling_metrics.configure(config_element('metrics', '', {})) + @metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics, throttling_metrics) yield end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index bb16c1ab8e..f9e3b1a0c5 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1878,13 +1878,6 @@ def test_z_refresh_watchers plugin.instance_eval do @pf = Fluent::Plugin::TailInput::PositionFile.load(sio, EX_FOLLOW_INODES, {}, logger: $log) @loop = Coolio::Loop.new - opened_file_metrics = Fluent::Plugin::LocalMetrics.new - opened_file_metrics.configure(config_element('metrics', '', {})) - closed_file_metrics = Fluent::Plugin::LocalMetrics.new - closed_file_metrics.configure(config_element('metrics', '', {})) - rotated_file_metrics = Fluent::Plugin::LocalMetrics.new - rotated_file_metrics.configure(config_element('metrics', '', {})) - @metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) end Timecop.freeze(2010, 1, 2, 3, 4, 5) do @@ -2245,15 +2238,6 @@ def test_should_close_watcher_after_rotate_wait config = common_follow_inode_config + config_element('', '', {"rotate_wait" => "1s", "limit_recently_modified" => "1s"}) d = create_driver(config, false) - d.instance.instance_eval do - opened_file_metrics = Fluent::Plugin::LocalMetrics.new - opened_file_metrics.configure(config_element('metrics', '', {})) - closed_file_metrics = Fluent::Plugin::LocalMetrics.new - closed_file_metrics.configure(config_element('metrics', '', {})) - rotated_file_metrics = Fluent::Plugin::LocalMetrics.new - rotated_file_metrics.configure(config_element('metrics', '', {})) - @metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) - end Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1"