Skip to main content

tower/balance/p2c/
service.rs

1use super::super::error;
2use crate::discover::{Change, Discover};
3use crate::load::Load;
4use crate::ready_cache::{error::Failed, ReadyCache};
5use crate::util::rng::{sample_floyd2, HasherRng, Rng};
6use futures_util::future::{self, TryFutureExt};
7use std::hash::Hash;
8use std::marker::PhantomData;
9use std::{
10    fmt,
11    pin::Pin,
12    task::{ready, Context, Poll},
13};
14use tower_service::Service;
15use tracing::{debug, trace};
16
17/// Efficiently distributes requests across an arbitrary number of services.
18///
19/// See the [module-level documentation](..) for details.
20///
21/// Note that [`Balance`] requires that the [`Discover`] you use is [`Unpin`] in order to implement
22/// [`Service`]. This is because it needs to be accessed from [`Service::poll_ready`], which takes
23/// `&mut self`. You can achieve this easily by wrapping your [`Discover`] in [`Box::pin`] before you
24/// construct the [`Balance`] instance. For more details, see [#319].
25///
26/// [`Box::pin`]: std::boxed::Box::pin()
27/// [#319]: https://github.com/tower-rs/tower/issues/319
28pub struct Balance<D, Req>
29where
30    D: Discover,
31    D::Key: Hash,
32{
33    discover: D,
34
35    services: ReadyCache<D::Key, D::Service, Req>,
36    ready_index: Option<usize>,
37
38    rng: Box<dyn Rng + Send + Sync>,
39
40    _req: PhantomData<Req>,
41}
42
43impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
44where
45    D: fmt::Debug,
46    D::Key: Hash + fmt::Debug,
47    D::Service: fmt::Debug,
48{
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        f.debug_struct("Balance")
51            .field("discover", &self.discover)
52            .field("services", &self.services)
53            .finish()
54    }
55}
56
57impl<D, Req> Balance<D, Req>
58where
59    D: Discover,
60    D::Key: Hash,
61    D::Service: Service<Req>,
62    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
63{
64    /// Constructs a load balancer that uses operating system entropy.
65    pub fn new(discover: D) -> Self {
66        Self::from_rng(discover, HasherRng::default())
67    }
68
69    /// Constructs a load balancer seeded with the provided random number generator.
70    pub fn from_rng<R: Rng + Send + Sync + 'static>(discover: D, rng: R) -> Self {
71        let rng = Box::new(rng);
72        Self {
73            rng,
74            discover,
75            services: ReadyCache::default(),
76            ready_index: None,
77
78            _req: PhantomData,
79        }
80    }
81
82    /// Returns the number of endpoints currently tracked by the balancer.
83    pub fn len(&self) -> usize {
84        self.services.len()
85    }
86
87    /// Returns whether or not the balancer is empty.
88    pub fn is_empty(&self) -> bool {
89        self.services.is_empty()
90    }
91}
92
93impl<D, Req> Balance<D, Req>
94where
95    D: Discover + Unpin,
96    D::Key: Hash + Clone,
97    D::Error: Into<crate::BoxError>,
98    D::Service: Service<Req> + Load,
99    <D::Service as Load>::Metric: std::fmt::Debug,
100    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
101{
102    /// Polls `discover` for updates, adding new items to `not_ready`.
103    ///
104    /// Removals may alter the order of either `ready` or `not_ready`.
105    fn update_pending_from_discover(
106        &mut self,
107        cx: &mut Context<'_>,
108    ) -> Poll<Option<Result<(), error::Discover>>> {
109        debug!("updating from discover");
110        loop {
111            match ready!(Pin::new(&mut self.discover).poll_discover(cx))
112                .transpose()
113                .map_err(|e| error::Discover(e.into()))?
114            {
115                None => return Poll::Ready(None),
116                Some(Change::Remove(key)) => {
117                    trace!("remove");
118                    self.services.evict(&key);
119                    // `evict` removes from the ready set with `swap_remove`,
120                    // which can move a different endpoint into the slot a
121                    // previously-selected `ready_index` points at. Discard the
122                    // cached selection so `poll_ready` re-runs P2C over the
123                    // current ready set instead of dispatching to the
124                    // swapped-in endpoint.
125                    self.ready_index = None;
126                }
127                Some(Change::Insert(key, svc)) => {
128                    trace!("insert");
129                    // If this service already existed in the set, it will be
130                    // replaced as the new one becomes ready.
131                    self.services.push(key, svc);
132                }
133            }
134        }
135    }
136
137    fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) {
138        loop {
139            match self.services.poll_pending(cx) {
140                Poll::Ready(Ok(())) => {
141                    // There are no remaining pending services.
142                    debug_assert_eq!(self.services.pending_len(), 0);
143                    break;
144                }
145                Poll::Pending => {
146                    // None of the pending services are ready.
147                    debug_assert!(self.services.pending_len() > 0);
148                    break;
149                }
150                Poll::Ready(Err(error)) => {
151                    // An individual service was lost; continue processing
152                    // pending services.
153                    debug!(%error, "dropping failed endpoint");
154                }
155            }
156        }
157        trace!(
158            ready = %self.services.ready_len(),
159            pending = %self.services.pending_len(),
160            "poll_unready"
161        );
162    }
163
164    /// Performs P2C on inner services to find a suitable endpoint.
165    fn p2c_ready_index(&mut self) -> Option<usize> {
166        match self.services.ready_len() {
167            0 => None,
168            1 => Some(0),
169            len => {
170                // Get two distinct random indexes (in a random order) and
171                // compare the loads of the service at each index.
172                let [aidx, bidx] = sample_floyd2(&mut self.rng, len as u64);
173                debug_assert_ne!(aidx, bidx, "random indices must be distinct");
174
175                let aload = self.ready_index_load(aidx as usize);
176                let bload = self.ready_index_load(bidx as usize);
177                let chosen = if aload <= bload { aidx } else { bidx };
178
179                trace!(
180                    a.index = aidx,
181                    a.load = ?aload,
182                    b.index = bidx,
183                    b.load = ?bload,
184                    chosen = if chosen == aidx { "a" } else { "b" },
185                    "p2c",
186                );
187                Some(chosen as usize)
188            }
189        }
190    }
191
192    /// Accesses a ready endpoint by index and returns its current load.
193    fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
194        let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
195        svc.load()
196    }
197}
198
199impl<D, Req> Service<Req> for Balance<D, Req>
200where
201    D: Discover + Unpin,
202    D::Key: Hash + Clone,
203    D::Error: Into<crate::BoxError>,
204    D::Service: Service<Req> + Load,
205    <D::Service as Load>::Metric: std::fmt::Debug,
206    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
207{
208    type Response = <D::Service as Service<Req>>::Response;
209    type Error = crate::BoxError;
210    type Future = future::MapErr<
211        <D::Service as Service<Req>>::Future,
212        fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
213    >;
214
215    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
216        // `ready_index` may have already been set by a prior invocation. If a
217        // discovered removal evicts a service, `update_pending_from_discover`
218        // clears it, since eviction can reorder the ready set (via
219        // `swap_remove`) and leave the cached index pointing at a different
220        // endpoint.
221        let _ = self.update_pending_from_discover(cx)?;
222        self.promote_pending_to_ready(cx);
223
224        loop {
225            // If a service has already been selected, ensure that it is ready.
226            // This ensures that the underlying service is ready immediately
227            // before a request is dispatched to it (i.e. in the same task
228            // invocation). If, e.g., a failure detector has changed the state
229            // of the service, it may be evicted from the ready set so that
230            // another service can be selected.
231            if let Some(index) = self.ready_index.take() {
232                match self.services.check_ready_index(cx, index) {
233                    Ok(true) => {
234                        // The service remains ready.
235                        self.ready_index = Some(index);
236                        return Poll::Ready(Ok(()));
237                    }
238                    Ok(false) => {
239                        // The service is no longer ready. Try to find a new one.
240                        trace!("ready service became unavailable");
241                    }
242                    Err(Failed(_, error)) => {
243                        // The ready endpoint failed, so log the error and try
244                        // to find a new one.
245                        debug!(%error, "endpoint failed");
246                    }
247                }
248            }
249
250            // Select a new service by comparing two at random and using the
251            // lesser-loaded service.
252            self.ready_index = self.p2c_ready_index();
253            if self.ready_index.is_none() {
254                debug_assert_eq!(self.services.ready_len(), 0);
255                // We have previously registered interest in updates from
256                // discover and pending services.
257                return Poll::Pending;
258            }
259        }
260    }
261
262    fn call(&mut self, request: Req) -> Self::Future {
263        let index = self.ready_index.take().expect("called before ready");
264        self.services
265            .call_ready_index(index, request)
266            .map_err(Into::into)
267    }
268}