Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_LEVEL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.Mockito.mock;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -176,6 +177,57 @@ private static void assertRackOrder(String rack, List<? extends DatanodeDetails>
}
}

@Test
public void sortDatanodesForWriteSortsRpcDeserializedPipeline() {
// Pipeline nodes arrive from SCM over RPC as deserialized DatanodeDetails
// with no topology linkage; the write sort must resolve them to OM's cluster
// map, otherwise every node is equidistant (MAX) and the order is random.
List<DatanodeDetails> rpcNodes = new ArrayList<>();
for (DatanodeDetails dn : nodeManager.getAllNodes()) {
rpcNodes.add(DatanodeDetails.getFromProtoBuf(dn.getProtoBufMessage()));
}
for (DatanodeDetails dn : nodeManager.getAllNodes()) {
// The client address is normally an IP, but the sort must resolve a client
// by either IP or hostname, so cover both.
List<? extends DatanodeDetails> byIp =
keyManager.sortDatanodesForWrite(rpcNodes, dn.getIpAddress());
assertEquals(dn, byIp.get(0),
"Source node should be sorted first for writes (IP client)");
assertRackOrder(dn.getNetworkLocation(), byIp);

List<? extends DatanodeDetails> byHostname =
keyManager.sortDatanodesForWrite(rpcNodes, dn.getHostName());
assertEquals(dn, byHostname.get(0),
"Source node should be sorted first for writes (hostname client)");
assertRackOrder(dn.getNetworkLocation(), byHostname);
}
}

@Test
public void sortDatanodesForWriteKeepsOrderForStaleTopology() {
List<DatanodeDetails> nodes = new ArrayList<>();
nodes.add(randomDatanodeDetails());
nodes.addAll(nodeManager.getAllNodes());

List<? extends DatanodeDetails> sorted =
keyManager.sortDatanodesForWrite(nodes, "edge0");

assertSame(nodes, sorted,
"Pipeline order should be preserved when a node is missing from the OM topology");
}

@Test
public void sortDatanodesForWriteKeepsOrderWhenClientUnresolved() {
List<? extends DatanodeDetails> nodes = nodeManager.getAllNodes();
List<DatanodeDetails> original = new ArrayList<>(nodes);
// A client that resolves to no known rack must NOT trigger a shuffle.
String unresolved = nodes.get(0).getIpAddress() + "X";
List<? extends DatanodeDetails> result =
keyManager.sortDatanodesForWrite(nodes, unresolved);
assertEquals(original, result,
"Write pipeline order must be preserved when client is unresolved");
}

private String nodeAddress(DatanodeDetails dn) {
boolean useHostname = config.getBoolean(
HddsConfigKeys.HDDS_DATANODE_USE_DN_HOSTNAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
Expand Down Expand Up @@ -363,4 +364,17 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId, long bucketId, OmKeyI
* @return BackgroundService
*/
CompactionService getCompactionService();

/**
* Sort the datanodes of a write pipeline by network-topology distance to the
* client, using OM's locally cached cluster map. Unlike the read-path sort,
* the original order is preserved when the client cannot be resolved, because
* the first node is used as the streaming-write primary.
*
* @param nodes the pipeline nodes to sort
* @param clientMachine client address (IP or hostname)
* @return nodes sorted nearest-first, or the input order if client is unresolved
*/
List<? extends DatanodeDetails> sortDatanodesForWrite(
List<? extends DatanodeDetails> nodes, String clientMachine);
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -124,6 +123,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.net.InnerNode;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.net.NodeImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -2204,32 +2204,91 @@ private void sortDatanodes(String clientMachine, List<OmKeyInfo> keyInfos) {
@VisibleForTesting
public List<? extends DatanodeDetails> sortDatanodes(List<? extends DatanodeDetails> nodes,
String clientMachine) {
final Node client = getClientNode(clientMachine, nodes);
return ozoneManager.getClusterMap()
.sortByDistanceCost(client, nodes, nodes.size());
final NetworkTopology clusterMap = ozoneManager.getClusterMap();
final Node client = getClientNode(clientMachine, nodes, clusterMap);
return clusterMap.sortByDistanceCost(client, nodes, nodes.size());
}

@Override
public List<? extends DatanodeDetails> sortDatanodesForWrite(
List<? extends DatanodeDetails> nodes, String clientMachine) {
if (StringUtils.isEmpty(clientMachine)) {
// No client address: keep the pipeline order (the first node is the write
// primary). Mirrors SCMBlockProtocolServer#getClientNode's empty guard.
return nodes;
}
return captureLatencyNs(
metrics.getAllocateBlockSortDatanodesLatencyNs(), () -> {
final NetworkTopology clusterMap = ozoneManager.getClusterMap();
final Node client = getClientNode(clientMachine, nodes, clusterMap);
if (client == null) {
// Preserve pipeline order for writes: the first node is the write
// primary, so do not shuffle when the client cannot be resolved.
return nodes;
}
return sortByClusterMapDistance(clusterMap, client, nodes);
});
}

/**
* Sort a pipeline's nodes by topology distance to the client. The nodes come
* from SCM over RPC, so they are deserialized {@link DatanodeDetails} with no
* parent/level and would be treated as outside the topology (distance
* {@link Integer#MAX_VALUE}) and shuffled. Resolve each node (and a co-located
* client) to its canonical instance in OM's cluster map before sorting, then
* map the sorted order back to the original pipeline nodes.
*/
private List<? extends DatanodeDetails> sortByClusterMapDistance(
NetworkTopology clusterMap, Node client,
List<? extends DatanodeDetails> nodes) {
final Node reader = toClusterMapNode(clusterMap, client);
final List<Node> topologyNodes = new ArrayList<>(nodes.size());
final Map<String, DatanodeDetails> nodeByPath = new HashMap<>();
for (DatanodeDetails node : nodes) {
final Node resolved = clusterMap.getNode(node.getNetworkFullPath());
if (resolved == null) {
return nodes;
}
topologyNodes.add(resolved);
nodeByPath.put(resolved.getNetworkFullPath(), node);
}
final List<Node> sorted =
clusterMap.sortByDistanceCost(reader, topologyNodes, topologyNodes.size());
final List<DatanodeDetails> result = new ArrayList<>(sorted.size());
for (Node node : sorted) {
result.add(nodeByPath.get(node.getNetworkFullPath()));
}
return result;
}

/**
* Resolve a node to its canonical, topology-linked instance in the given
* cluster map, or return the input node if it is not in the map.
*/
private Node toClusterMapNode(NetworkTopology clusterMap, Node node) {
final Node resolved = clusterMap.getNode(node.getNetworkFullPath());
return resolved != null ? resolved : node;
}

private Node getClientNode(String clientMachine,
List<? extends DatanodeDetails> nodes) {
List<DatanodeDetails> matchingNodes = new ArrayList<>();
boolean useHostname = ozoneManager.getConfiguration().getBoolean(
HddsConfigKeys.HDDS_DATANODE_USE_DN_HOSTNAME,
HddsConfigKeys.HDDS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
List<? extends DatanodeDetails> nodes, NetworkTopology clusterMap) {
for (DatanodeDetails node : nodes) {
if ((useHostname ? node.getHostName() : node.getIpAddress()).equals(
clientMachine)) {
matchingNodes.add(node);
// Match by either IP or hostname, like SCM's getNodesByAddress; the client
// address is always an IP even when use.datanode.hostname is enabled.
if (clientMachine.equals(node.getIpAddress())
|| clientMachine.equals(node.getHostName())) {
return node;
}
}
return !matchingNodes.isEmpty() ? matchingNodes.get(0) :
getOtherNode(clientMachine);
return getOtherNode(clientMachine, clusterMap);
}

private Node getOtherNode(String clientMachine) {
private Node getOtherNode(String clientMachine,
NetworkTopology clusterMap) {
try {
String clientLocation = resolveNodeLocation(clientMachine);
if (clientLocation != null) {
Node rack = ozoneManager.getClusterMap().getNode(clientLocation);
Node rack = clusterMap.getNode(clientLocation);
if (rack instanceof InnerNode) {
return new NodeImpl(clientMachine, clientLocation,
(InnerNode) rack, rack.getLevel() + 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public class OMPerformanceMetrics {
@Metric(about = "Sort datanodes latency in getKeyInfo")
private MutableRate getKeyInfoSortDatanodesLatencyNs;

@Metric(about = "Sort datanodes latency in allocateBlock (streaming write)")
private MutableRate allocateBlockSortDatanodesLatencyNs;

@Metric(about = "resolveBucketLink latency in getKeyInfo")
private MutableRate getKeyInfoResolveBucketLatencyNs;

Expand Down Expand Up @@ -246,6 +249,10 @@ MutableRate getGetKeyInfoSortDatanodesLatencyNs() {
return getKeyInfoSortDatanodesLatencyNs;
}

MutableRate getAllocateBlockSortDatanodesLatencyNs() {
return allocateBlockSortDatanodesLatencyNs;
}

public void setForceContainerCacheRefresh(boolean value) {
forceContainerCacheRefresh.add(value ? 1L : 0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
ozoneManager.getOMServiceId(),
ozoneManager.getMetrics(),
keyArgs.getSortDatanodes(),
userInfo);
userInfo, ozoneManager.getKeyManager());

KeyArgs.Builder newKeyArgs = keyArgs.toBuilder()
.setModificationTime(Time.now()).setType(type).setFactor(factor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
ozoneManager.getPreallocateBlocksMax(),
ozoneManager.isGrpcBlockTokenEnabled(),
ozoneManager.getOMServiceId(), ozoneManager.getMetrics(),
keyArgs.getSortDatanodes(), userInfo);
keyArgs.getSortDatanodes(), userInfo, ozoneManager.getKeyManager());

// Set modification time and normalize key if required.
KeyArgs.Builder newKeyArgs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
ozoneManager.getOMServiceId(),
ozoneManager.getMetrics(),
keyArgs.getSortDatanodes(),
userInfo));
userInfo, ozoneManager.getKeyManager()));
effectiveDataSize = requestedSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,20 @@
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ipc_.Server;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmConfig;
Expand Down Expand Up @@ -190,25 +193,23 @@ protected List< OmKeyLocationInfo > allocateBlock(ScmClient scmClient,
ReplicationConfig replicationConfig, ExcludeList excludeList,
long requestedSize, long scmBlockSize, int preallocateBlocksMax,
boolean grpcBlockTokenEnabled, String serviceID, OMMetrics omMetrics,
boolean shouldSortDatanodes, UserInfo userInfo)
boolean shouldSortDatanodes, UserInfo userInfo, KeyManager keyManager)
throws IOException {
int dataGroupSize = replicationConfig instanceof ECReplicationConfig
? ((ECReplicationConfig) replicationConfig).getData() : 1;
int numBlocks = (int) Math.min(preallocateBlocksMax,
(requestedSize - 1) / (scmBlockSize * dataGroupSize) + 1);

String clientMachine = "";
if (shouldSortDatanodes) {
clientMachine = userInfo.getRemoteAddress();
}
final String clientMachine =
shouldSortDatanodes ? userInfo.getRemoteAddress() : "";

List<OmKeyLocationInfo> locationInfos = new ArrayList<>(numBlocks);
String remoteUser = getRemoteUser().getShortUserName();
List<AllocatedBlock> allocatedBlocks;
try {
allocatedBlocks = scmClient.getBlockClient()
.allocateBlock(scmBlockSize, numBlocks, replicationConfig, serviceID,
excludeList, clientMachine);
excludeList, "");
} catch (SCMException ex) {
omMetrics.incNumBlockAllocateCallFails();
if (ex.getResult()
Expand All @@ -218,13 +219,27 @@ protected List< OmKeyLocationInfo > allocateBlock(ScmClient scmClient,
}
throw ex;
}
// Cache the sorted order per pipeline so blocks sharing a pipeline are
// sorted once (mirrors the read path's per-pipeline caching).
final Map<List<DatanodeDetails>, List<? extends DatanodeDetails>> sortedByNodes =
shouldSortDatanodes ? new HashMap<>() : null;
for (AllocatedBlock allocatedBlock : allocatedBlocks) {
BlockID blockID = new BlockID(allocatedBlock.getBlockID());
Pipeline pipeline = allocatedBlock.getPipeline();
if (shouldSortDatanodes) {
final List<DatanodeDetails> nodes = pipeline.getNodes();
final List<? extends DatanodeDetails> sorted = sortedByNodes
.computeIfAbsent(nodes,
n -> keyManager.sortDatanodesForWrite(n, clientMachine));
if (!Objects.equals(sorted, pipeline.getNodesInOrder())) {
pipeline = pipeline.copyWithNodesInOrder(sorted);
}
}
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(blockID)
.setLength(scmBlockSize)
.setOffset(0)
.setPipeline(allocatedBlock.getPipeline());
.setPipeline(pipeline);
if (grpcBlockTokenEnabled) {
builder.setToken(secretManager.generateToken(remoteUser, blockID,
EnumSet.of(READ, WRITE), scmBlockSize));
Expand Down
Loading