读xxl-job源码分析如何选择/路由执行节点。其中一个策略是一致性hash。
分析
xxl-job中的一致性hash路由策略还是蛮简单的。根据jobId和执行器地址列表来做路由即可。
- 执行器地址(ip:port) hash到TreeMap
- 为了避免增减节点时负载不均衡,加入虚拟节点。每个物理节点虚拟为100个虚拟节点分散到TreeMap中
- 重写Hash算法避免原生hash算法不均衡问题
- 采用TreeMap的tailMap功能找到大于等于当前hash值的节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam;
import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.SortedMap; import java.util.TreeMap;
public class ExecutorRouteConsistentHash extends ExecutorRouter {
private static int VIRTUAL_NODE_NUM = 100;
private static long hash(String key) {
MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException("MD5 not supported", e); } md5.reset(); byte[] keyBytes = null; try { keyBytes = key.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("Unknown string :" + key, e); }
md5.update(keyBytes); byte[] digest = md5.digest();
long hashCode = ((long) (digest[3] & 0xFF) << 24) | ((long) (digest[2] & 0xFF) << 16) | ((long) (digest[1] & 0xFF) << 8) | (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL; return truncateHashCode; }
public String hashJob(int jobId, List<String> addressList) {
TreeMap<Long, String> addressRing = new TreeMap<Long, String>(); for (String address: addressList) { for (int i = 0; i < VIRTUAL_NODE_NUM; i++) { long addressHash = hash("SHARD-" + address + "-NODE-" + i); addressRing.put(addressHash, address); } }
long jobHash = hash(String.valueOf(jobId)); SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash); if (!lastRing.isEmpty()) { return lastRing.get(lastRing.firstKey()); } return addressRing.firstEntry().getValue(); }
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = hashJob(triggerParam.getJobId(), addressList); return new ReturnT<String>(address); }
}
|
后记
之所以关注这个逻辑是因为线上运行xxl-job时有时候job会不调度了,不确定是因为应用发版还是进程假死(FULL GC频繁)引起的。理想情况下被调用的执行器有问题时会路由到健康的执行器上去。