1use super::super::error;
2use crate::discover::{Change, Discover};
3use crate::load::Load;
4use crate::ready_cache::{error::Failed, ReadyCache};
5use crate::util::rng::{sample_floyd2, HasherRng, Rng};
6use futures_util::future::{self, TryFutureExt};
7use std::hash::Hash;
8use std::marker::PhantomData;
9use std::{
10 fmt,
11 pin::Pin,
12 task::{ready, Context, Poll},
13};
14use tower_service::Service;
15use tracing::{debug, trace};
16
17pub struct Balance<D, Req>
29where
30 D: Discover,
31 D::Key: Hash,
32{
33 discover: D,
34
35 services: ReadyCache<D::Key, D::Service, Req>,
36 ready_index: Option<usize>,
37
38 rng: Box<dyn Rng + Send + Sync>,
39
40 _req: PhantomData<Req>,
41}
42
43impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
44where
45 D: fmt::Debug,
46 D::Key: Hash + fmt::Debug,
47 D::Service: fmt::Debug,
48{
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 f.debug_struct("Balance")
51 .field("discover", &self.discover)
52 .field("services", &self.services)
53 .finish()
54 }
55}
56
57impl<D, Req> Balance<D, Req>
58where
59 D: Discover,
60 D::Key: Hash,
61 D::Service: Service<Req>,
62 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
63{
64 pub fn new(discover: D) -> Self {
66 Self::from_rng(discover, HasherRng::default())
67 }
68
69 pub fn from_rng<R: Rng + Send + Sync + 'static>(discover: D, rng: R) -> Self {
71 let rng = Box::new(rng);
72 Self {
73 rng,
74 discover,
75 services: ReadyCache::default(),
76 ready_index: None,
77
78 _req: PhantomData,
79 }
80 }
81
82 pub fn len(&self) -> usize {
84 self.services.len()
85 }
86
87 pub fn is_empty(&self) -> bool {
89 self.services.is_empty()
90 }
91}
92
93impl<D, Req> Balance<D, Req>
94where
95 D: Discover + Unpin,
96 D::Key: Hash + Clone,
97 D::Error: Into<crate::BoxError>,
98 D::Service: Service<Req> + Load,
99 <D::Service as Load>::Metric: std::fmt::Debug,
100 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
101{
102 fn update_pending_from_discover(
106 &mut self,
107 cx: &mut Context<'_>,
108 ) -> Poll<Option<Result<(), error::Discover>>> {
109 debug!("updating from discover");
110 loop {
111 match ready!(Pin::new(&mut self.discover).poll_discover(cx))
112 .transpose()
113 .map_err(|e| error::Discover(e.into()))?
114 {
115 None => return Poll::Ready(None),
116 Some(Change::Remove(key)) => {
117 trace!("remove");
118 self.services.evict(&key);
119 self.ready_index = None;
126 }
127 Some(Change::Insert(key, svc)) => {
128 trace!("insert");
129 self.services.push(key, svc);
132 }
133 }
134 }
135 }
136
137 fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) {
138 loop {
139 match self.services.poll_pending(cx) {
140 Poll::Ready(Ok(())) => {
141 debug_assert_eq!(self.services.pending_len(), 0);
143 break;
144 }
145 Poll::Pending => {
146 debug_assert!(self.services.pending_len() > 0);
148 break;
149 }
150 Poll::Ready(Err(error)) => {
151 debug!(%error, "dropping failed endpoint");
154 }
155 }
156 }
157 trace!(
158 ready = %self.services.ready_len(),
159 pending = %self.services.pending_len(),
160 "poll_unready"
161 );
162 }
163
164 fn p2c_ready_index(&mut self) -> Option<usize> {
166 match self.services.ready_len() {
167 0 => None,
168 1 => Some(0),
169 len => {
170 let [aidx, bidx] = sample_floyd2(&mut self.rng, len as u64);
173 debug_assert_ne!(aidx, bidx, "random indices must be distinct");
174
175 let aload = self.ready_index_load(aidx as usize);
176 let bload = self.ready_index_load(bidx as usize);
177 let chosen = if aload <= bload { aidx } else { bidx };
178
179 trace!(
180 a.index = aidx,
181 a.load = ?aload,
182 b.index = bidx,
183 b.load = ?bload,
184 chosen = if chosen == aidx { "a" } else { "b" },
185 "p2c",
186 );
187 Some(chosen as usize)
188 }
189 }
190 }
191
192 fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
194 let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
195 svc.load()
196 }
197}
198
199impl<D, Req> Service<Req> for Balance<D, Req>
200where
201 D: Discover + Unpin,
202 D::Key: Hash + Clone,
203 D::Error: Into<crate::BoxError>,
204 D::Service: Service<Req> + Load,
205 <D::Service as Load>::Metric: std::fmt::Debug,
206 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
207{
208 type Response = <D::Service as Service<Req>>::Response;
209 type Error = crate::BoxError;
210 type Future = future::MapErr<
211 <D::Service as Service<Req>>::Future,
212 fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
213 >;
214
215 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
216 let _ = self.update_pending_from_discover(cx)?;
222 self.promote_pending_to_ready(cx);
223
224 loop {
225 if let Some(index) = self.ready_index.take() {
232 match self.services.check_ready_index(cx, index) {
233 Ok(true) => {
234 self.ready_index = Some(index);
236 return Poll::Ready(Ok(()));
237 }
238 Ok(false) => {
239 trace!("ready service became unavailable");
241 }
242 Err(Failed(_, error)) => {
243 debug!(%error, "endpoint failed");
246 }
247 }
248 }
249
250 self.ready_index = self.p2c_ready_index();
253 if self.ready_index.is_none() {
254 debug_assert_eq!(self.services.ready_len(), 0);
255 return Poll::Pending;
258 }
259 }
260 }
261
262 fn call(&mut self, request: Req) -> Self::Future {
263 let index = self.ready_index.take().expect("called before ready");
264 self.services
265 .call_ready_index(index, request)
266 .map_err(Into::into)
267 }
268}