-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
out_http: Add option to reuse connections #4330
Conversation
Hi @daipom, |
b00f90e
to
7ba811d
Compare
Thanks for this enhancement. Now, I'm working on releasing fluent-package v5.0.2, which contains Fluentd v1.16.3. Since this adds the new option, it will be released on v1.17.0 (fluent-package v5.1.0). |
Sorry for my late response. It does not affect the existing specifications. However, there are a few points that concern me. I will comment on them later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my late response and the overall change request.
It looks good, but I believe we can use more simple logic.
Would you please consider if it is possible to use this kind of simple logic?
It is becoming difficult to maintain Fluentd due to the complexity of the code.
I want to keep the code as simple as possible.
Here are the main points.
- Avoid
nil
initialization of the cache array. - Avoid managing the array index with mutex.
- Share the logic
make_request
by using the&block
argument.
If you have any concerns or ideas, please let me know.
lib/fluent/plugin/out_http.rb
Outdated
@cache = [] | ||
@cache_id_mutex = Mutex.new | ||
@cache_entry = Struct.new(:uri, :conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that we can manage these caches more simply.
@cache = [] | |
@cache_id_mutex = Mutex.new | |
@cache_entry = Struct.new(:uri, :conn) | |
@cache_by_thread = {} |
Why do you define the struct in the constructor?
It would be normal to define it directly under the class.
class HTTPOutput < Output
Fluent::Plugin.register_output('http', self)
class RetryableResponse < StandardError; end
CacheEntry = Struct.new(:uri, :conn)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you define the struct in the constructor?
It would be normal to define it directly under the class.
I fixed this in d57402e
lib/fluent/plugin/out_http.rb
Outdated
# Close all open connections | ||
@cache.each {|entry| entry.conn.finish if entry.conn&.started? } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Close all open connections | |
@cache.each {|entry| entry.conn.finish if entry.conn&.started? } | |
@cache_by_thread.each_value do |cache| | |
cache.conn.finish if cache.conn.started? | |
end |
lib/fluent/plugin/out_http.rb
Outdated
@cache = Array.new(actual_flush_thread_count, @cache_entry.new("", nil)) if @reuse_connections | ||
@cache_id = 0 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cache = Array.new(actual_flush_thread_count, @cache_entry.new("", nil)) if @reuse_connections | |
@cache_id = 0 |
We should not use the struct with nil
or empty values if possible.
It makes code management difficult.
lib/fluent/plugin/out_http.rb
Outdated
def make_request_cached(uri, req) | ||
id = Thread.current.thread_variable_get(plugin_id) | ||
if id.nil? | ||
@cache_id_mutex.synchronize { | ||
id = @cache_id | ||
@cache_id += 1 | ||
} | ||
Thread.current.thread_variable_set(plugin_id, id) | ||
end | ||
uri_str = uri.to_s | ||
if @cache[id].uri != uri_str | ||
@cache[id].conn.finish if @cache[id].conn&.started? | ||
http = if @proxy_uri | ||
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) | ||
else | ||
Net::HTTP.start(uri.host, uri.port, @http_opt) | ||
end | ||
@cache[id] = @cache_entry.new(uri_str, http) | ||
end | ||
@cache[id].conn.request(req) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the overall change, but I believe we can use more simple logic like this.
def make_request_cached(uri, req) | |
id = Thread.current.thread_variable_get(plugin_id) | |
if id.nil? | |
@cache_id_mutex.synchronize { | |
id = @cache_id | |
@cache_id += 1 | |
} | |
Thread.current.thread_variable_set(plugin_id, id) | |
end | |
uri_str = uri.to_s | |
if @cache[id].uri != uri_str | |
@cache[id].conn.finish if @cache[id].conn&.started? | |
http = if @proxy_uri | |
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) | |
else | |
Net::HTTP.start(uri.host, uri.port, @http_opt) | |
end | |
@cache[id] = @cache_entry.new(uri_str, http) | |
end | |
@cache[id].conn.request(req) | |
end | |
def make_request_cached(uri, req) | |
thread_name = Thread.current.name | |
@cache_by_thread[thread_name] ||= CacheEntry.new(uri.to_s, make_request(uri, req)) | |
cache = @cache_by_thread[thread_name] | |
unless cache.uri == uri.to_s | |
cache.conn.finish if cache.conn.started? | |
@cache_by_thread[thread_name] = CacheEntry.new(uri.to_s, make_request(uri, req)) | |
cache = @cache_by_thread[thread_name] | |
end | |
cache.conn.request(req) | |
end |
Note: we can use the thread name flush_thread_{i}
for the key.
fluentd/lib/fluent/plugin/output.rb
Lines 506 to 511 in 0893c39
@buffer_config.flush_thread_count.times do |i| | |
thread_title = "flush_thread_#{i}".to_sym | |
thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new) | |
thread = thread_create(thread_title) do | |
flush_thread_run(thread_state) | |
end |
Hi @daipom, Thanks for the review. I agree that using a hash makes the code simpler. Unfortunately, hashes are not thread-safe in Ruby and inserting the CacheEntries could result in data-races. Therefore, we would again need a mutex. The array solution had the advantage, that the mutex was only used during the first request of a thread to retrieve the index. Afterwards, the code was lock-free. |
@Garfield96
I think it's basically thread-safe, although some of the features are not thread-safe (e.g.
So, I don't expect my suggestion to cause data races. |
Signed-off-by: Christian Norbert Menges <[email protected]>
Signed-off-by: Christian Norbert Menges <[email protected]>
Signed-off-by: Christian Norbert Menges <[email protected]>
7bb7553
to
fdf8914
Compare
Signed-off-by: Daijiro Fukuda <[email protected]> Co-authored-by: Daijiro Fukuda <[email protected]>
fdf8914
to
af18531
Compare
Rebased to the latest master, and add my DCO. |
Signed-off-by: Daijiro Fukuda <[email protected]>
Signed-off-by: Daijiro Fukuda <[email protected]>
I have added only the following additional fixes.
I have not added commits about the logic simplification that I'm suggesting because we don't have a consensus in the discussion about data race concerns (#4330 (comment)). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, although there are still some discussions about code simplification.
lib/fluent/plugin/out_http.rb
Outdated
@@ -302,16 +318,41 @@ def create_request(chunk, uri) | |||
req | |||
end | |||
|
|||
def make_request_cached(uri, req) | |||
id = Thread.current.thread_variable_get(plugin_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not appropriate to use plugin_id
as a key, it should be used with a suffix like "#{plugin_id}_connection_cache_id".
In addition, we prefer to use Thread#[]
instead of Thread#thread_variable_get
if there is no particular reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I agree.
Done in 9ce635c.
and following: * Use `Thread#[]` style * because it is the common for Fluentd code * Rename `connection_cache_id` to `connection_cache_next_id` * for clarity. Signed-off-by: Daijiro Fukuda <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks! |
Thanks so much @Garfield96 ! |
New feature of Fluentd v1.17.0. Related: fluent/fluentd#4330 Signed-off-by: Daijiro Fukuda <[email protected]>
Which issue(s) this PR fixes:
N/A
What this PR does / why we need it:
The HTTP output plugin recreates a connection for every HTTP request. In combination with TLS and high network latencies, this can result in very poor performance. This PR adds an option to reuse connections.
The implementation caches one connection per flush thread. The cache is part of the HTTP plugin instance and is implemented as an array. When a thread creates its first request, it gets assigned a slot (id) in this array and the id is stored in a thread local variable. Since this slot is exclusively used by a single thread, no synchronization is required. If the endpoint changes at runtime, the old connection will be closed and replaced by a new connection to the new endpoint. During shutdown, all open connections are finished in the
close
method.This implementation works very well for a static endpoint. If the endpoint changes frequently, there will be little benefit. However, it can be assumed that most users use a static endpoint.
Benchmark
I tested the throughput with two Fluentd instances on a single machine. One instance acting as sender and the other as receiver. Even though the connection creation was very fast, because both instances were located on the same machine and TLS was not enabled, activating connection reuse doubled the throughput.
Sender configuration:
Receiver configuration:
Docs Changes:
fluent/fluentd-docs-gitbook#501
Release Note:
out_http: Add option to reuse connections