Dubbo加权轮询负载均衡算法应用之推荐产品

Dubbo加权轮询负载均衡算法,核心点:weight(固定的权重),currentWeight(当前权重,动态变化的),算法逻辑:轮询服务提供者(每个服务提供者都有weight和currentWeight),currentWeight增加weight,取最大的currentWeight,然后取对应的服务提供者,最后将取到的服务提供者的currentWeight减去总的权重(所有服务提供者的weight之和)。示例如下:

服务器 [A, B, C] 对应权重 [5, 1, 1] ,现在有7个请求依次进入负载均衡逻辑,选择过程如下:

请求编号currentWeight 数组选择结果减去权重总和后的 currentWeight 数组
1[5, 1, 1]A[-2, 1, 1]
2[3, 2, 2]A[-4, 2, 2]
3[1, 3, 3]B[1, -4, 3]
4[6, -3, 4]A[-1, -3, 4]
5[4, -2, 5]C[4, -2, -2]
6[9, -1, -1]A[2, -1, -1]
7[7, 0, 0]A[0, 0, 0]

如上,经过平滑性处理后,得到的服务器序列为 [A, A, B, A, C, A, A]。初始情况下 currentWeight = [0, 0, 0],第7个请求处理完后,currentWeight 再次变为 [0, 0, 0]。

业务实践应用:推荐产品(公司开发了同类型的多种产品,对接的是不同的合作商,现要求按一定的比例,向不同的客户推荐不同合作商的产品,保证各合作商的流量。比如合作商A的产品A:合作商B的产品B:合作商C的产品C=5:1:1,那么如果有7个客户想了解该类型的产品,就向其中5人推荐A,向其中1人推荐B,向其中1人推荐C)。

仿照Dubbo加权轮询负载均衡算法,实现推荐产品的算法,代码如下:

public class ProductMessage {
    private String productName;
    private int weight;

    public ProductMessage(String productName, int weight) {
        this.productName = productName;
        this.weight = weight;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public int getWeight() {
        return weight;
    }

    public void setWeight(int weight) {
        this.weight = weight;
    }
}
import java.util.concurrent.atomic.AtomicLong;

public class WeightedRoundRobin {
    private int weight;
    private AtomicLong current = new AtomicLong(0);
    private long lastUpdate;
    public int getWeight() {
        return weight;
    }
    public void setWeight(int weight) {
        this.weight = weight;
        current.set(0);
    }
    public long increaseCurrent() {
        return current.addAndGet(weight);
    }
    public void sel(int total) {
        current.addAndGet(-1 * total);
    }
    public long getLastUpdate() {
        return lastUpdate;
    }
    public void setLastUpdate(long lastUpdate) {
        this.lastUpdate = lastUpdate;
    }
}
import java.util.Map;
import java.util.List;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 业务场景
 * 轮询推荐产品,保证各产品按指定权重被推荐
 *
 */
public class ProductRoundRobin {
    private static int RECYCLE_PERIOD = 300000;
    private AtomicBoolean updateLock = new AtomicBoolean();

    private ConcurrentMap<String, WeightedRoundRobin> productMap = new ConcurrentHashMap<String, WeightedRoundRobin>();

    public ProductMessage selectProduct(List<ProductMessage> productMessageList) {
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        ProductMessage selectedProduct = null;
        WeightedRoundRobin selectedWRR = null;
        for (ProductMessage productMessage : productMessageList) {
            String identifyString = productMessage.toString();
            WeightedRoundRobin weightedRoundRobin = productMap.get(identifyString);
            int weight = productMessage.getWeight();
            if (weight < 0) {
                weight = 0;
            }
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                productMap.putIfAbsent(identifyString, weightedRoundRobin);
                weightedRoundRobin = productMap.get(identifyString);
            }
            if (weight != weightedRoundRobin.getWeight()) {
                weightedRoundRobin.setWeight(weight);
            }
            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedProduct = productMessage;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight += weight;
        }

        if (!updateLock.get() && productMessageList.size() != productMap.size()) {
            if (updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                    newMap.putAll(productMap);
                    Iterator<Map.Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Map.Entry<String, WeightedRoundRobin> item = it.next();
                        if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                            it.remove();
                        }
                    }
                } finally {
                    updateLock.set(false);
                }
            }
        }

        if (selectedProduct != null) {
            selectedWRR.sel(totalWeight);
            return selectedProduct;
        }
        return productMessageList.get(0);
    }
}
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;

public class ProductRoundRobinTest {
    public static void main(String[] args) {
        /**
         * 设定:
         * 产品A,权重5
         * 产品B,权重1
         * 产品C,权重1
         */
        ProductMessage product1 = new ProductMessage("产品A", 5);
        ProductMessage product2 = new ProductMessage("产品B", 1);
        ProductMessage product3 = new ProductMessage("产品C", 1);
        List<ProductMessage> productMessageList = new ArrayList<ProductMessage>() {{
            add(product1);
            add(product2);
            add(product3);
        }};

        ProductRoundRobin productRoundRobin = new ProductRoundRobin();

        // 进行7次推荐
        for (int i = 0; i < 7; i++) {
            ProductMessage selectedProduct = productRoundRobin.selectProduct(productMessageList);
            System.out.println("productName:" + selectedProduct.getProductName());
        }

        Map<String, Long> countMap = new HashMap<>();
        for (int i = 0; i < 1000000; i++) {
            ProductMessage selectedProduct = productRoundRobin.selectProduct(productMessageList);
            Long count = countMap.get(selectedProduct.getProductName());
            countMap.put(selectedProduct.getProductName(), count == null ? 1 : ++count);
        }

        for (Map.Entry<String, Long> entry : countMap.entrySet()) {
            System.out.println("introduce productName:" + entry.getKey() + "; introduce count:" + entry.getValue());
        }
    }
}

运行结果如下:

productName:产品A
productName:产品A
productName:产品B
productName:产品A
productName:产品C
productName:产品A
productName:产品A
introduce productName:产品A; introduce count:714286
introduce productName:产品C; introduce count:142857
introduce productName:产品B; introduce count:142857

总结:技术服务于业务,将算法逻辑,应用到实际业务中,让业务更加智能化。即所谓的科技赋能。

相关推荐