"""
Tools for analysis of EBSD maps in form of .ang files
@author: Alexander Hartmaier, Abhishek Biswas, ICAMS, Ruhr-Universität Bochum
"""
import logging
import warnings
import numpy as np
import networkx as nx
import json
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from kanapy.core import get_grain_geom
from scipy.stats import lognorm, vonmises
from scipy.spatial import ConvexHull, KDTree
from scipy.special import legendre, beta
from scipy.optimize import fminbound, linear_sum_assignment
from scipy.integrate import quad
from scipy.sparse import coo_matrix
from scipy.sparse.csgraph import connected_components
from skimage.segmentation import mark_boundaries
from orix import io
from orix import plot as ox_plot
from orix.quaternion import Orientation, symmetry
from orix.quaternion.symmetry import Symmetry
from orix.sampling import get_sample_fundamental
from orix.vector import Miller
from abc import ABC
from pathlib import Path
from collections import Counter, defaultdict
from typing import Any, Sequence, Union, Dict, Tuple, List
[docs]
def get_distinct_colormap(N, cmap='prism'):
"""
Generate N visually distinct colors as an RGB colormap.
Parameters:
- N: int, number of colors
- seed: optional int, random seed for reproducibility
Returns:
- cmap: list of N RGB tuples in [0, 1]
"""
colors = plt.get_cmap(cmap, N)
col_ = [colors(i)[:3] for i in range(N)]
return col_
[docs]
def neighbors(r, c, connectivity=8):
"""
Return the neighboring coordinates of a cell in a 2D grid.
Parameters
----------
r : int
Row index of the cell.
c : int
Column index of the cell.
connectivity : int, optional
Type of connectivity. Options are:
- 1 or 4 : 4-connected neighbors (up, down, left, right)
- 8 : 8-connected neighbors (includes diagonals)
Default is 8.
Returns
-------
neighbors : list of tuple
List of (row, column) tuples representing neighboring cells.
Examples
--------
>>> neighbors(2, 3, connectivity=4)
[(3, 3), (1, 3), (2, 4), (2, 2)]
>>> neighbors(2, 3)
[(3, 2), (3, 3), (3, 4), (2, 2), (2, 4), (1, 2), (1, 3), (1, 4)]
"""
if connectivity == 1 or connectivity == 4:
return [(r + 1, c), (r - 1, c), (r, c + 1), (r, c - 1)]
else:
return [(r + i, c + j) for i in [-1, 0, 1] for j in [-1, 0, 1]
if not (i == 0 and j == 0)]
[docs]
def merge_nodes(G, node1, node2):
"""
Merge the pixel and attribute data of node1 into node2 in a graph
This function performs the following steps:
1. Concatenates the pixel lists of node1 and node2 and updates node2's 'pixels' attribute
2. Updates the average orientation 'ori_av' and pixel count 'npix' for node2
3. Updates the convex hull for node2 if it exists
4. Adds edges from node1's neighbors to node2, avoiding duplicates
5. Removes node1 from the graph
6. Updates the graph's 'label_map' to replace all occurrences of node1 with node2
Parameters
----------
G : networkx.Graph
Graph object where nodes represent grains and have attributes:
- 'pixels' : array of pixel indices
- 'ori_av' : average orientation
- 'npix' : number of pixels
- optional 'hull' : ConvexHull object
The graph also contains:
- 'label_map' : 2D array of node labels
- 'dx', 'dy' : pixel spacing in x and y directions
node1 : int
Node ID to be merged and removed
node2 : int
Node ID to merge into and retain
Notes
-----
- Modifies the graph `G` in place
- Updates 'pixels', 'ori_av', 'npix', 'hull' for node2
- Updates 'label_map' to replace node1 with node2
- Adds new edges from node1 to node2 while avoiding duplicates
"""
# merge pixel lists of node 1 into node 2, delete node 1
G.nodes[node2]['pixels'] = np.concatenate((G.nodes[node1]['pixels'], G.nodes[node2]['pixels']))
npix1 = G.nodes[node1]['npix']
npix2 = G.nodes[node2]['npix']
ntot = npix1 + npix2
G.nodes[node2]['ori_av'] = (G.nodes[node1]['ori_av'] * G.nodes[node1]['npix'] +
G.nodes[node2]['ori_av'] * G.nodes[node2]['npix']) / ntot
G.nodes[node2]['npix'] = ntot # update length
if 'hull' in G.nodes[node2].keys():
# update hull if it exists already
sh = G.graph['label_map'].shape
pts = np.array(np.unravel_index(G.nodes[node2]['pixels'], sh), dtype=float).T
pts[:, 0] *= G.graph['dx'] # convert pixel distances to micron
pts[:, 1] *= G.graph['dy']
G.nodes[node2]['hull'] = ConvexHull(pts)
# add new edges (will ignore if edge already exists)
for neigh in G.adj[node1]:
if node2 != neigh:
G.add_edge(node2, neigh)
G.remove_node(node1) # remove grain1 and all its edges
# update label map
ix, iy = np.nonzero(G.graph['label_map'] == node1)
G.graph['label_map'][ix, iy] = node2
[docs]
def find_largest_neighbor(G, node):
"""
Find the largest neighboring node of a given node in a graph
This function iterates over all neighbors of the specified node and returns
the neighbor with the largest 'npix' (number of pixels).
Parameters
----------
G : networkx.Graph
Graph object where nodes have an attribute 'npix' representing their size.
node : int
Node ID for which the largest neighbor is to be found.
Returns
-------
int
Node ID of the largest neighbor.
Notes
-----
- Raises a ValueError if the node has no neighbors.
- Raises a ValueError if the graph contains circular edges (neighbor is the node itself).
- Does not modify the graph.
"""
# find the largest neighbor of given grain
size_ln = 0 # size of largest neighbor grain
num_ln = -1 # ID of largest neighbor grain
for neigh in G.adj[node]:
if G.nodes[neigh]['npix'] > size_ln:
size_ln = G.nodes[neigh]['npix']
num_ln = neigh # number of largest neighbor grain
if num_ln < 0:
raise ValueError(f'Grain {node} has no neighbors.')
if num_ln == node:
raise ValueError(f'Corrupted graph with circular edges: {node, num_ln}.')
return num_ln
[docs]
def find_sim_neighbor(G, nn):
"""
Find the neighboring node most similar in orientation to the given node
This function computes the misorientation angles between the given node and
all its neighbors, and returns the neighbor with the smallest misorientation.
Parameters
----------
G : networkx.Graph
Graph object where nodes have an 'ori_av' attribute representing average orientation.
The graph also contains a 'symmetry' key in G.graph used for orientation calculations.
nn : int
Node ID for which the most similar neighbor is to be found.
Returns
-------
tuple
A tuple (neighbor_id, angle) where:
- neighbor_id : int, ID of the neighbor with smallest misorientation
- angle : float, misorientation angle with the given node
Notes
-----
- Uses the `Orientation` class for misorientation calculations
- Assumes all neighbors have a valid 'ori_av' attribute
- Does not modify the graph
"""
sym = G.graph['symmetry']
ori0 = Orientation(G.nodes[nn]['ori_av'], sym)
on = []
ng = []
for neigh in G.adj[nn]:
ang = ori0.angle_with(Orientation(G.nodes[neigh]['ori_av'], sym))[0]
on.append(ang)
ng.append(neigh)
nn = np.argmin(on)
return ng[nn], on[nn]
[docs]
def summarize_labels(label_array, rotations, wanted_labels=None):
"""
Summarize labeled pixels with average orientation and statistics
This function computes, for each label in `label_array` (or a subset of `wanted_labels`):
- Number of pixels
- Pixel indices
- Average orientation vector
- Standard deviation of orientation
Parameters
----------
label_array : ndarray of shape (H, W)
2D array of integer labels.
rotations : ndarray of shape (H*W, D)
Orientation data corresponding to each pixel (e.g., emap.rotations.data).
wanted_labels : sequence of int, optional
List of label IDs to summarize. If None, all labels in `label_array` are used.
Returns
-------
list of tuples
Each tuple is `(label, info_dict)` where `info_dict` contains:
- 'npix' : int, number of pixels
- 'pixels' : array of 1D indices in `label_array.ravel()`
- 'ori_av' : ndarray of shape (D,), average orientation vector
- 'ori_std' : ndarray of shape (D,), standard deviation of orientation
Notes
-----
- Uses `np.add.reduceat` and sorting for efficient computation
- The returned list preserves the order of `wanted_labels` if provided,
otherwise follows sorted unique labels from `label_array`
- Does not modify input arrays
"""
lab = label_array.ravel()
N = lab.size
rot = rotations # shape (N, D)
# Gruppierung: alle Pixel nach Label sortieren
order = np.argsort(lab, kind="stable")
lab_sorted = lab[order]
# Eindeutige Labels + Segmentstarts + Counts
uniq, starts, counts = np.unique(lab_sorted, return_index=True, return_counts=True)
ends = starts + counts
# Rotations in derselben Reihenfolge sortieren
rot_sorted = rot[order]
# Mittelwerte und Standardabweichungen pro Segment (reduceat ist sehr schnell)
sums = np.add.reduceat(rot_sorted, starts, axis=0)
means = sums / counts[:, None]
sq = rot_sorted * rot_sorted
sums_sq = np.add.reduceat(sq, starts, axis=0)
vars_ = sums_sq / counts[:, None] - means**2
stds = np.sqrt(np.maximum(vars_, 0.0))
# Pixelindizes pro Label (als Listen von 1D-Indizes)
pixels_list = np.split(order, starts[1:]) # Liste der Segmente in uniq-Reihenfolge
# Auswahl / Re-Ordering auf gewünschte Labels
if wanted_labels is None:
labels_out = uniq
idx = np.arange(len(uniq))
else:
labels_out = np.asarray(wanted_labels)
pos = {int(l): i for i, l in enumerate(uniq)}
idx = np.array([pos[int(l)] for l in labels_out], dtype=int)
nodes = []
for lbl, i in zip(labels_out, idx):
info = {
"npix": int(counts[i]),
"pixels": pixels_list[i], # 1D-Indices in label_array.ravel()
"ori_av": means[i], # shape (D,)
"ori_std": stds[i], # shape (D,)
}
nodes.append((int(lbl), info))
return nodes
[docs]
def build_graph_from_labeled_pixels(label_array, rot, sym, dx, dy, connectivity=8):
"""
Build a graph representation of a microstructure from labeled pixels
Each node in the graph represents a grain (label) and stores pixel indices,
average orientation, and orientation statistics. Edges connect neighboring grains
based on pixel adjacency.
Parameters
----------
label_array : ndarray of shape (H, W)
2D array of integer labels representing grains.
emap : object
Orientation map object containing:
- `rotations.data` : orientation vectors for each pixel
- `phases.point_groups` : symmetry information for each phase
- `dx`, `dy` : pixel spacing in x and y directions
phase : int
Phase index to select the corresponding symmetry from `emap.phases.point_groups`.
connectivity : int, optional
Connectivity criterion for neighbors (4 or 8). Default is 8.
Returns
-------
networkx.Graph
Graph object with:
- Nodes representing grains, storing 'pixels', 'ori_av', 'ori_std', 'npix'
- Edges representing neighboring grains (shared boundaries)
- Graph attributes: 'label_map', 'symmetry', 'dx', 'dy'
Notes
-----
- Uses `summarize_labels` to extract node properties from labeled pixels
- Loops over all pixels to add edges between neighboring grains
- The function preserves the shape and indices of `label_array`
"""
# t1 = time.time()
nodes = summarize_labels(label_array, rot)
# t2 = time.time()
# print(f'Time for extracting nodes: {t2 - t1}')
# print(f'Building microstructure graph with {len(nodes)} nodes (grains).')
G = nx.Graph(label_map=label_array, symmetry=sym,
dx=dx, dy=dy)
G.add_nodes_from(nodes)
# t3 = time.time()
# print(f'Time for building graph: {t3 - t2}')
# print('Adding edges (grain boundaries) to microstructure graph.')
rows, cols = label_array.shape
for x in range(rows):
for y in range(cols):
label_here = label_array[x, y]
for px, py in neighbors(x, y, connectivity=connectivity):
if 0 <= px < rows and 0 <= py < cols:
neighbor_label = label_array[px, py]
if neighbor_label != label_here:
G.add_edge(label_here, neighbor_label)
# t4 = time.time()
# print(f'Time for adding edges: {t4 - t3}')
return G
[docs]
def visualize_graph(G, node_size=100, fs=12):
"""
Visualize a microstructure graph using a spring layout
This function draws the nodes and edges of the graph with labels,
using a spring layout for positioning. Node color, edge color,
node size, and font size can be adjusted.
Parameters
----------
G : networkx.Graph
Graph object representing the microstructure. Nodes should have meaningful labels.
node_size : int, optional
Size of the nodes in the plot. Default is 100.
fs : int, optional
Font size for node labels. Default is 12.
Notes
-----
- Uses NetworkX's `spring_layout` for node positioning
- Uses Matplotlib to render the plot
- Does not modify the graph
"""
pos = nx.spring_layout(G, seed=42) # positioning
nx.draw(G, pos, with_labels=True,
node_color='lightblue', edge_color='gray',
node_size=node_size, font_size=fs)
plt.show()
[docs]
def export_graph(G, filename, format="graphml"):
"""
Export a microstructure graph to a file in the specified format
This function writes the NetworkX graph to disk in either GraphML or GEXF format.
Parameters
----------
G : networkx.Graph
Graph object representing the microstructure.
filename : str
Path to the output file.
format : str, optional
File format for export. Supported options are 'graphml' and 'gexf'. Default is 'graphml'.
Notes
-----
- Raises a ValueError if an unsupported format is provided
- Does not modify the graph
"""
if format == "graphml":
nx.write_graphml(G, filename)
elif format == "gexf":
nx.write_gexf(G, filename)
else:
raise ValueError("Only 'graphml' or 'gexf' formats are supported.")
[docs]
def find_similar_regions(array, tolerance=0.087, connectivity=1):
"""
Identify connected regions of similar values in a 2D array
This function labels all connected pixels in the input array whose values
differ by at most `tolerance`. Connectivity can be 4- or 8-connected.
Parameters
----------
array : ndarray of shape (H, W)
2D input array with values to segment.
tolerance : float, optional
Maximum allowed difference between connected values. Default is 0.087.
connectivity : int, optional
Connectivity criterion for neighbors: 1 for 4-connectivity, 2 for 8-connectivity. Default is 1.
Returns
-------
labeled_array : ndarray of shape (H, W)
Array where each connected region of similar values has a unique label.
num_features : int
Total number of connected regions found.
Notes
-----
- Uses a depth-first search to find connected regions.
- Does not modify the input array.
- Assumes `neighbors(i, j, connectivity)` returns valid neighboring indices.
"""
array = np.asarray(array)
visited = np.full(array.shape, False, dtype=bool)
labeled_array = np.zeros(array.shape, dtype=int)
current_label = 1
rows, cols = array.shape
for r in range(rows):
for c in range(cols):
if not visited[r, c]:
ref_val = array[r, c]
stack = [(r, c)]
while stack:
i, j = stack.pop()
if (0 <= i < rows) and (0 <= j < cols) and not visited[i, j]:
if abs(array[i, j] - ref_val) <= tolerance:
visited[i, j] = True
labeled_array[i, j] = current_label
stack.extend(neighbors(i, j, connectivity))
current_label += 1
return labeled_array, current_label - 1
[docs]
def calc_error(odf_ref, odf_test, res=10.):
"""
Compute the normalized difference between two orientation distribution functions (ODFs)
This function evaluates the ODFs on a sample grid in fundamental space and
computes the L1 norm of the difference after normalization.
Parameters
----------
odf_ref : object
Reference ODF object. Must have `.orientations.symmetry` and `.evaluate()` method.
odf_test : object
Test ODF object to compare. Must have the same symmetry as `odf_ref`.
res : float, optional
Resolution for sampling the fundamental space grid. Default is 10.
Returns
-------
float
L1 error between the normalized ODFs.
Notes
-----
- Raises a RuntimeError if the symmetries of the two ODFs do not match.
- Uses `get_sample_fundamental` to sample the fundamental zone and `Orientation` for evaluation.
- Does not modify the input ODFs.
"""
cs = odf_ref.orientations.symmetry
if cs != odf_test.orientations.symmetry:
raise RuntimeError("Symmetries of ODF's do not match.")
so3g = get_sample_fundamental(resolution=res, point_group=cs)
so3g = Orientation(so3g.data, symmetry=cs)
p1 = odf_ref.evaluate(so3g)
p2 = odf_test.evaluate(so3g)
err = np.sum(np.abs(p1/np.sum(p1) - p2/np.sum(p2)))
return err
[docs]
def calc_orientations(odf, nori, res=None):
"""
Generate a set of orientations sampled from an orientation distribution function (ODF)
This function uses Monte Carlo sampling to generate `nori` orientations according to
the probabilities defined by the input ODF. Sampling is performed in the fundamental
zone of the crystal symmetry.
Parameters
----------
odf : object
Orientation distribution function object. Must have `.orientations.symmetry` and `.evaluate()` method.
nori : int
Number of orientations to generate.
res : float, optional
Angular resolution in degrees for sampling the fundamental zone grid.
If None, a resolution between 2° and 5° is selected based on ODF half-width.
Returns
-------
Orientation
Orientation object containing `nori` orientations sampled from the ODF.
Notes
-----
- Raises a RuntimeError if the Monte Carlo sampling does not converge after 200 iterations.
- Logs a warning if the SO3 grid resolution is too coarse for the requested number of orientations.
- Uses `get_sample_fundamental` to generate the sampling grid and `Orientation` to store results.
- Does not modify the input ODF.
"""
oq = np.zeros((nori, 4))
indstart = 0
rem = nori
cs = odf.orientations.symmetry
hw = odf.halfwidth
if res is None:
res = np.clip(np.rad2deg(hw), 2, 5)
so3g = get_sample_fundamental(resolution=res, point_group=cs) # generate fine mesh in fund. region; res in degree!
so3g = Orientation(so3g.data, symmetry=cs)
if so3g.size < 5*nori:
logging.warning(f'Resolution of SO3 grid in "calc_orientations" is too coarse: {res}°.\n'
f'Only {so3g.size} grid points available for {nori} desired orientations.')
val = odf.evaluate(so3g) # value of ODF at each gridpoint
# do MC sampling of ODF to generate nori orientations
ctr = 0
while rem > 0 and ctr < 200:
rn = np.random.rand(so3g.size)
vn = val * np.linalg.norm(rn) * nori / np.linalg.norm(val)
ori = so3g[vn >= rn]
no = min(ori.size, rem)
oq[indstart:indstart+no, :] = ori.data[0:no, :]
indstart += no
rem -= no
ctr += 1
if ctr >= 200:
raise RuntimeError(f'Monte Carlo algorithm in "calc_orientations" did not converge for resolution {res}.\n'
f'Orientations found {indstart} orientations. Target was {nori}.')
return Orientation(oq, symmetry=cs)
[docs]
def odf_est(ori, odf, nstep=50, step=0.5, halfwidth=None, verbose=False):
"""
Estimate an ODF from a set of orientations using iterative half-width adjustment
This function generates a new orientation distribution function (ODF) from
input orientations by iteratively adjusting the half-width to minimize the
error relative to a reference ODF.
Parameters
----------
ori : Orientation
Input orientations used to estimate the ODF.
odf : ODF
Reference orientation distribution function for error comparison.
nstep : int, optional
Maximum number of iterations for adjusting the half-width. Default is 50.
step : float, optional
Increment step in degrees for half-width adjustment. Default is 0.5.
halfwidth : float, optional
Initial half-width in radians. If None, a value based on `odf.halfwidth` is used.
verbose : bool, optional
If True, prints iteration details. Default is False.
Returns
-------
ODF
Estimated orientation distribution function corresponding to input orientations.
Notes
-----
- Uses `calc_error` to compare the estimated ODF with the reference ODF.
- Iteratively increases half-width until the error starts to increase.
- The returned ODF corresponds to the last half-width before error increased.
- Does not modify the input orientations or reference ODF.
"""
e0 = 1e8
st_rad = np.radians(step)
hwmin = np.radians(2)
hwmax = np.radians(30) - 0.5 * nstep * st_rad
if halfwidth is None:
hw = np.clip(odf.halfwidth - 0.5 * nstep * st_rad, hwmin, hwmax)
if verbose:
print(f'Initial halfwidth set to {np.degrees(hw)}')
else:
hw = np.clip(halfwidth - 2 * st_rad, hwmin, hwmax)
if verbose:
print(f'Initial halfwidth is {np.degrees(hw)}')
for c in range(nstep):
todf = ODF(ori, halfwidth=hw)
e = calc_error(odf, todf)
if verbose:
print(f'Iteration {c}: error={e}, halfwidth={np.degrees(hw)}°')
if e > e0:
break
else:
e0 = e
hw += st_rad
# return last value before error increased
hw -= st_rad
todf = ODF(ori, halfwidth=hw)
return todf
[docs]
def find_orientations_fast(ori1: Orientation, ori2: Orientation, tol: float = 1e-3) -> np.ndarray:
"""
Find closest matches in ori1 for each orientation in ori2 using KDTree
Parameters
----------
ori1 : Orientation
Orientation database (e.g., EBSD orientations).
ori2 : Orientation
Orientations to match (e.g., grain mean orientations).
tol : float
Angular tolerance in radians.
Returns
-------
matches : np.ndarray
Array of indices in ori1 matching each entry in ori2; -1 if no match found.
"""
# Get quaternions
q1 = ori1.data
q2 = ori2.data
tree = KDTree(q2) # KDTree in 4D quaternion space for ori2 (typically, regular SO3 grid)
dists, indices = tree.query(q1) # For every orientation in ori1, look for nearest neighbor in ori2 (grid)
# create list of length or ori2, and store counts and indices of nearest neighbors in ori1
matches = np.zeros(ori2.size, dtype=int)
neigh_dict = dict()
for i, idx in enumerate(indices):
matches[idx] += 1
if idx in neigh_dict.keys():
neigh_dict[idx].append(i)
else:
neigh_dict[idx] = [i]
return matches, neigh_dict
[docs]
def texture_reconstruction(ns, ebsd=None, ebsdfile=None, orientations=None,
grainsfile=None, grains=None, kernel=None, kernel_halfwidth=5,
res_low=5, res_high=25, res_step=2, lim=5, hw_init=None, verbose=False):
"""
Reconstruct a reduced ODF from EBSD or orientation data
This function systematically reconstructs an orientation distribution function (ODF)
using a given number of orientations or grains in a representative volume element (RVE).
The reconstructed ODF approximates the misorientation distribution of the input data.
Follows the method described in Biswas et al. (https://doi.org/10.1107/S1600576719017138).
Parameters
----------
ns : int
Number of reduced orientations/grains in the RVE.
ebsd : EBSDmap, optional
EBSD map containing orientations for a single phase.
ebsdfile : str, optional
Path to a *.mat file with EBSD data (not yet supported).
orientations : Orientation, optional
Predefined orientations to use instead of EBSD data.
grainsfile : str, optional
Path to estimated grains file (*.mat). Not used for kernel estimation.
grains : object, optional
Grain data. Not used for kernel estimation.
kernel : DeLaValleePoussinKernel, optional
Kernel for ODF estimation. Default: halfwidth=kernel_halfwidth.
kernel_halfwidth : float, optional
Halfwidth in degrees for default DeLaValleePoussin kernel. Default is 5°.
res_low : float, optional
Minimum resolution (in degrees) of orientation grid for reconstruction. Default 5°.
res_high : float, optional
Maximum resolution (in degrees) of orientation grid. Default 25°.
res_step : float, optional
Step size (in degrees) for grid resolution. Default 2°.
lim : int, optional
Maximum number of consecutive worsening errors before stopping. Default 5.
verbose : bool, optional
If True, print progress and error information. Default False.
Returns
-------
orired_f : Orientation
Reduced set of orientations in the fundamental region.
odfred_f : ODF
Reconstructed ODF using the reduced orientations.
ero : float
L1 error between reference ODF and reconstructed ODF.
res : float
Grid resolution (degrees) at which the minimum error was achieved.
Notes
-----
- Uses Monte Carlo sampling and kernel smoothing to generate reduced orientations.
- Only supports single-phase EBSD data.
- Kernel estimation from grains or files is not yet supported; default kernel is used.
- The reconstructed ODF approximates the misorientation distribution of the input data.
"""
ori = None
psi = None
if ebsdfile is not None:
raise ModuleNotFoundError('Option "ebsdMatFile" is not yet supported.')
# ind = args.index('ebsdMatFile') + 1
# ebsd = loadmat(args[ind])
# ebsd_var = list(ebsd.keys())[0]
# ebsd = ebsd[ebsd_var]
# assert len(np.unique(ebsd.phaseId)) == 1, 'EBSD has multiple phases'
# ori = ebsd.orientations
if ebsd is not None:
if not isinstance(ebsd, EBSDmap):
raise TypeError('Argument "ebsd" must be of type EBSDmap.')
assert len(ebsd.emap.phases.ids) == 1, 'EBSD has multiple phases'
if ori is not None:
logging.warning('Both arguments "ebsd" and "ori" are given, using EBSD map orientations.')
else:
ori = ebsd.emap.orientations
if orientations is not None:
if not isinstance(orientations, Orientation):
raise TypeError('Argument "orientations" must be of type Orientation,')
if ori is not None:
logging.warning('Both EBSD map and orientations are given, using EBSD.')
else:
ori = orientations
if grainsfile is not None:
logging.warning('Estimation of optimal kernel from grain files not supported. '
'DeLaValleePoussinKernel with halfwidth 5° will be used.\n'
'Please use kanapy-mtex for support of optimal kernels.')
# ind = args.index('grainsMatFile') + 1
# grains = loadmat(args[ind])
# grains_var = list(grains.keys())[0]
# grains = grains[grains_var]
# assert len(np.unique(grains.phaseId)) == 1, 'Grains has multiple phases'
# print('Optimum kernel estimated from mean orientations of grains')
# psi = calcKernel(grains.meanOrientation)
if grains is not None:
logging.warning('Estimation of optimal kernel from grains not supported. '
'DeLaValleePoussinKernel with halfwidth 5° will be used.\n'
'Please use kanapy-mtex for support of optimal kernels.')
#assert len(np.unique(grains.phaseId)) == 1, 'Grains has multiple phases'
#print('Optimum kernel estimated from mean orientations of grains')
#psi = calcKernel(grains.meanOrientation)
if kernel is not None:
if not isinstance(kernel, DeLaValleePoussinKernel):
raise TypeError('Only kernels of type "DeLaValeePoussinKernel" are supported.')
psi = kernel
if psi is None:
psi = DeLaValleePoussinKernel(halfwidth=np.radians(kernel_halfwidth))
# Step 1: Create reference odf from given orientations with proper kernel
odf = ODF(ori, kernel=psi)
cs = ori.symmetry
ero = 10.
e_mod = []
hw_stored = hw_init
for hw in np.arange(res_low, res_high + res_step, res_step):
# Step 2: create equispaced grid of orientations
S3G = get_sample_fundamental(resolution=hw, point_group=cs) # resolution in degrees! ori.SS not considered
S3G = Orientation(S3G.data, symmetry=cs)
# Step 3: calculate number of orientations close to each grid point (0 if no orientation close to GP)
# count close orientations from EBSD map for each grid point, and get list of neighbor indices
M, neighs = find_orientations_fast(ori, S3G, tol=np.radians(0.5))
ictr = np.nonzero(M > 0)[0] # indices of gridpoints with non-zero counts
weights = M[ictr] # create weights from non-zero counts of EBSD orientations at gridpoints
weights = weights / np.sum(weights) # calculate weights
# Step 4: Calculate scaling factor hval such that sum of all int(weights*hval) = ns
lval = 0
hval = float(ns)
ifc = 1.0
ihval = np.sum(np.round(hval * weights))
while (hval - lval > hval * 1e-15 or ihval < ns) and ihval != ns:
if ihval < ns:
hval_old = hval
hval = hval + ifc * (hval - lval) / 2.0
lval = hval_old
ifc = ifc * 2.0
else:
hval = (lval + hval) / 2.0
ifc = 1.0
ihval = np.sum(np.round(hval * weights))
screen = np.round(weights * hval) # number of orientations associated to each grid point
diff = np.sum(screen) - ns # difference to desired number of orientations
weights_loc = np.argsort(weights)
co = 1
while diff > 0:
if screen[weights_loc[co]] > 0:
screen[weights_loc[co]] = screen[weights_loc[co]] - 1
diff = np.sum(screen) - ns
co = co + 1
# Step 5: Subdivide orientations around grid points into screen orientations
# and estimate mean orientation for each group or take orientation of grid point
# if only one orientation needs to be generated
ori_red = np.zeros((ns, 4))
octr = 0
for i, no in enumerate(screen):
nl = neighs[ictr[i]]
if len(nl) < no:
raise RuntimeError(f'{len(nl)} < {no} @ {i}, ictr: {ictr[i]}, weight: {weights[i]}')
if 0.5 < no < 1.5:
ori_red[octr, :] = S3G[ictr[i]].data
octr += 1
assert np.isclose(no, 1), f'no: {no}, should be 1'
elif no >= 1.5:
# split orientations in EBSD map matching to one grid point according to required number of
# orientations at this point
ind = np.linspace(0, len(nl), int(no)+1, dtype=int)
idx = np.split(np.array(nl), ind[1:-1])
ho = octr
for j in idx:
ori_red[octr, :] = np.mean(ori[j].data, axis=0)
octr += 1
if not np.isclose(no, octr-ho):
print(f'len_nl: {len(nl)}, len_idx: {len(idx)}')
raise RuntimeError(f'no: {no}, but increment is only #{octr-ho}')
# create Orientations from quaternion array
ori_f = Orientation(ori_red, symmetry=cs)
assert not np.isnan(ori_f.data).any()
assert not np.isinf(ori_f.data).any()
# Step 6: Compute reduced ODF
odfred = odf_est(ori_f, odf, halfwidth=hw_stored, verbose=verbose)
hw_stored = odfred.halfwidth
# Step 7: Compute error for kernel optimization
er = calc_error(odf, odfred)
if verbose:
print(f'Resolution: {hw}, Error: {er}, Reduced HW: {np.degrees(odfred.halfwidth)}°')
# store best results and evaluate stopping criterion
if er < ero:
orired_f = ori_f
odfred_f = odfred
ero = er
res = hw
e_mod.append(er)
if len(e_mod) - np.argmin(e_mod) > lim:
break
orired_f = orired_f.in_euler_fundamental_region()
if verbose:
print(f'Final resolution: {res}, reduced HW: {np.degrees(odfred_f.halfwidth)}°')
return orired_f, odfred_f, ero, res
[docs]
class Kernel(ABC):
"""
Abstract base class for kernels used in orientation distribution estimation.
Attributes
----------
A : ndarray
Flattened array of kernel parameters. Initialized to empty array if not provided.
"""
def __init__(self, A=None):
self.A = np.array(A).flatten() if A is not None else np.array([])
@property
def bandwidth(self):
"""
Returns the bandwidth of the kernel.
The bandwidth is defined as the number of elements in `A` minus one.
Returns
-------
int
Bandwidth of the kernel.
"""
return len(self.A) - 1
@bandwidth.setter
def bandwidth(self, L):
"""
Set the bandwidth of the kernel.
Truncates the kernel parameter array `A` to have at most `L + 1` elements.
Parameters
----------
L : int
Desired bandwidth.
"""
self.A = self.A[:min(L + 1, len(self.A))]
def __str__(self):
"""
Return a human-readable string representation of the kernel.
Displays the kernel type and its halfwidth in degrees.
Returns
-------
str
String describing the kernel.
"""
return f"custom, halfwidth {np.degrees(self.halfwidth()):.2f}°"
def __eq__(self, other):
"""
Check if two kernels are equal.
Comparison is based on the truncated arrays of kernel parameters `A` up to
the minimum bandwidth of the two kernels. Returns True if the relative
difference is less than 1e-6.
Parameters
----------
other : Kernel
Another kernel to compare against.
Returns
-------
bool
True if kernels are considered equal, False otherwise.
"""
L = min(self.bandwidth, other.bandwidth)
return np.linalg.norm(self.A[:L + 1] - other.A[:L + 1]) / np.linalg.norm(self.A) < 1e-6
def __mul__(self, other):
"""
Multiply two kernels element-wise with scaling by (2l + 1).
Only the first `L + 1` elements are used, where L is the minimum bandwidth
of the two kernels. Returns a new Kernel instance with the resulting array.
Parameters
----------
other : Kernel
Another kernel to multiply with.
Returns
-------
Kernel
New kernel resulting from element-wise multiplication and scaling.
"""
L = min(self.bandwidth, other.bandwidth)
l = np.arange(L + 1)
return Kernel(self.A[:L + 1] * other.A[:L + 1] / (2 * l + 1))
def __pow__(self, p):
"""
Raise kernel elements to a given power with scaling.
Each element of the kernel parameter array `A` is scaled by (2l + 1),
raised to the power `p`, and then rescaled by (2l + 1).
Parameters
----------
p : float
Power to raise the kernel elements to.
Returns
-------
Kernel
New kernel resulting from the power operation with scaling.
"""
l = np.arange(self.bandwidth + 1)
return Kernel(((self.A / (2 * l + 1)) ** p) * (2 * l + 1))
[docs]
def norm(self):
"""
Compute the L2 norm of the squared kernel elements.
Returns
-------
float
L2 norm of the squared elements of the kernel array `A`.
"""
return np.linalg.norm(self.A ** 2)
[docs]
def cutA(self, fft_accuracy=1e-2):
"""
Truncate the kernel array `A` based on the specified FFT accuracy.
Elements of `A` are scaled by 1/(l^2), and elements smaller than
a threshold determined from `fft_accuracy` are removed.
Parameters
----------
fft_accuracy : float, optional
Desired FFT accuracy (default is 1e-2).
Notes
-----
The first element is never truncated.
"""
epsilon = fft_accuracy / 150
A_mod = self.A / (np.arange(1, len(self.A) + 1) ** 2)
idx = np.where(A_mod[1:] <= max(min([np.min(A_mod[1:]), 10 * epsilon]), epsilon))[0]
if idx.size > 0:
self.A = self.A[:idx[0] + 2]
[docs]
def halfwidth(self):
"""
Compute the halfwidth of the kernel.
The halfwidth is determined by finding the angle `omega` that minimizes
the squared difference between K(1) and 2*K(cos(omega/2)).
Returns
-------
float
Halfwidth angle in radians.
"""
def error_fn(omega):
"""
Compute the squared difference used to determine kernel halfwidth.
Parameters
----------
omega : float
Angle in radians to evaluate the kernel function.
Returns
-------
float
Squared difference: (K(1) - 2*K(cos(omega/2)))**2
"""
return (self.K(1) - 2 * self.K(np.cos(omega / 2))) ** 2
return fminbound(error_fn, 0, 3 * np.pi / 4)
[docs]
def K(self, co2):
"""
Evaluate the kernel function at a given squared cosine value.
Parameters
----------
co2 : float or ndarray
Cosine squared value(s), will be clipped to [-1, 1].
Returns
-------
float or ndarray
Value(s) of the kernel function.
"""
co2 = np.clip(co2, -1, 1)
omega = 2 * np.arccos(co2)
return self._clenshawU(self.A, omega)
[docs]
def K_orientations(self, orientations_ref, orientations):
"""
Evaluate the kernel function for the misorientation angles between two sets of orientations.
Parameters
----------
orientations_ref : Orientation
Reference set of orientations.
orientations : Orientation
Target set of orientations to compare.
Returns
-------
ndarray
Kernel values corresponding to misorientation angles.
"""
misangles = orientations.angle_with(orientations_ref)
co2 = np.cos(misangles / 2)
return self.K(co2)
[docs]
def RK(self, d):
"""
Evaluate the kernel using the Clenshaw-L method for given cosine distances.
Parameters
----------
d : array_like
Cosine of angles (distance metric) in the range [-1, 1].
Returns
-------
ndarray
Kernel values for the given distances.
"""
d = np.clip(d, -1, 1)
return self._clenshawL(self.A, d)
[docs]
def RRK(self, dh, dr):
"""
Evaluate the kernel on two sets of cosine distances using a reduced rotational kernel.
Parameters
----------
dh : array_like
Cosines of angles for the first set, clipped to [-1, 1].
dr : array_like
Cosines of angles for the second set, clipped to [-1, 1].
Returns
-------
ndarray
2D array of kernel values for each combination of dh and dr.
"""
dh = np.clip(dh, -1, 1)
dr = np.clip(dr, -1, 1)
L = self.bandwidth
result = np.zeros((len(dh), len(dr)))
if len(dh) < len(dr):
for i, dh_i in enumerate(dh):
Plh = [legendre(l)(dh_i) for l in range(L + 1)]
result[i, :] = self._clenshawL(np.array(Plh) * self.A, dr)
else:
for j, dr_j in enumerate(dr):
Plr = [legendre(l)(dr_j) for l in range(L + 1)]
result[:, j] = self._clenshawL(np.array(Plr) * self.A, dh)
result[result < 0] = 0
return result
def _clenshawU(self, A, omega):
"""
Evaluate the kernel using the Clenshaw algorithm in the Chebyshev U basis.
Parameters
----------
A : array_like
Coefficients of the kernel.
omega : array_like
Angles in radians.
Returns
-------
ndarray
Kernel values evaluated at the given angles.
"""
omega = omega / 2
res = np.ones_like(omega) * A[0]
for l in range(1, len(A)):
term = np.cos(2 * l * omega) + np.cos(omega) * np.cos((2 * l - 1) * omega) + \
(np.cos(omega) ** 2)
res += A[l] * term
return res
def _clenshawL(self, A, x):
"""
Evaluate the kernel using the Clenshaw algorithm in the Legendre basis.
Parameters
----------
A : array_like
Coefficients of the kernel.
x : array_like
Input values where the kernel is evaluated (should be within [-1, 1]).
Returns
-------
ndarray or float
Kernel values evaluated at the given input(s).
"""
b_next, b_curr = 0.0, 0.0
x2 = 2 * x
for a in reversed(A[1:]):
b_next, b_curr = b_curr, a + x2 * b_curr - b_next
return A[0] + x * b_curr - b_next
[docs]
def calc_fourier(self, L, max_angle=np.pi, fft_accuracy=1e-2):
"""
Compute the Fourier coefficients of the kernel up to order L.
Parameters
----------
L : int
Maximum order of the Fourier coefficients.
max_angle : float, optional
Upper limit of integration in radians (default is pi).
fft_accuracy : float, optional
Threshold below which coefficients are considered negligible and
computation stops early (default is 1e-2).
Returns
-------
ndarray
Array of Fourier coefficients of length <= L+1.
"""
A = []
small = 0
for l in range(L + 1):
def integrand(omega):
return self.K(np.cos(omega / 2)) * np.sin((2 * l + 1) * omega / 2) * np.sin(omega / 2)
coeff, _ = quad(integrand, 0, max_angle, limit=2000)
coeff *= 2 / np.pi
A.append(coeff)
if abs(coeff) < fft_accuracy:
small += 1
else:
small = 0
if small == 10:
break
return np.array(A)
[docs]
def plot_K(self, n_points=200):
"""
Plot the kernel function K as a function of misorientation angle.
Parameters
----------
n_points : int, optional
Number of points used for plotting the kernel function (default is 200).
"""
omega = np.linspace(0, np.pi, n_points)
co2 = np.cos(omega / 2)
values = self.K(co2)
plt.figure()
plt.plot(np.degrees(omega), values)
plt.xlabel("Misorientation angle (degrees)")
plt.ylabel("K(cos(omega/2))")
plt.title("Kernel Function")
plt.grid(True)
plt.show()
[docs]
class DeLaValleePoussinKernel(Kernel):
"""
De La Vallee Poussin kernel class for orientation distribution functions
Parameters
----------
kappa : float, optional
Shape parameter of the kernel
halfwidth : float, optional
Halfwidth in radians; overrides kappa if provided
bandwidth : int, optional
Maximum degree of the series expansion
Attributes
----------
A : ndarray
Series coefficients of the kernel
kappa : float
Shape parameter of the kernel
C : float
Normalization constant
"""
def __init__(self, kappa=None, halfwidth=None, bandwidth=None):
if halfwidth is not None:
kappa = 0.5 * np.log(0.5) / np.log(np.cos(0.5*halfwidth))
elif kappa is None:
kappa = 90
self.kappa = kappa
L = bandwidth if bandwidth is not None else round(kappa)
C = beta(1.5, 0.5) / beta(1.5, kappa + 0.5)
self.C = C
A = np.ones(L + 1)
A[1] = kappa / (kappa + 2)
for l in range(1, L - 1):
A[l + 1] = ((kappa - l + 1) * A[l - 1] - (2 * l + 1) * A[l]) / (kappa + l + 2)
for l in range(0, L + 1):
A[l] *= (2 * l + 1)
super().__init__(A)
self.cutA()
[docs]
def K(self, co2):
"""
Evaluate the De La Vallee Poussin kernel function.
Parameters
----------
co2 : float or ndarray
Cosine of half the misorientation angle. Values will be clipped to [-1, 1].
Returns
-------
float or ndarray
Kernel value(s) evaluated at the input `co2`.
"""
co2 = np.clip(co2, -1, 1)
return self.C * co2 ** (2 * self.kappa)
[docs]
def DK(self, co2):
"""
Evaluate the derivative of the De La Vallee Poussin kernel with respect to misorientation.
Parameters
----------
co2 : float or ndarray
Cosine of half the misorientation angle. Values should be in [-1, 1].
Returns
-------
float or ndarray
Derivative of the kernel function evaluated at the input `co2`.
"""
return -self.C * self.kappa * np.sqrt(1 - co2 ** 2) * co2 ** (2 * self.kappa - 1)
[docs]
def RK(self, t):
"""
Evaluate the reduced kernel function R_K at a given input.
Parameters
----------
t : float or ndarray
Input value, typically representing a normalized distance or cosine value.
Returns
-------
float or ndarray
Value of the reduced kernel function R_K at `t`.
"""
return (1 + self.kappa) * ((1 + t) / 2) ** self.kappa
[docs]
def DRK(self, t):
"""
Evaluate the derivative of the reduced kernel function R_K at a given input.
Parameters
----------
t : float or ndarray
Input value, typically representing a normalized distance or cosine value.
Returns
-------
float or ndarray
Value of the derivative of R_K at `t`.
"""
return self.kappa * (1 + self.kappa) * ((1 + t) / 2) ** (self.kappa - 1) / 2
[docs]
def halfwidth(self):
"""
Compute the halfwidth of the DeLaValleePoussin kernel.
The halfwidth is the misorientation angle ω where the kernel value
drops to half its maximum.
Returns
-------
float
Halfwidth angle in radians.
"""
return 2 * np.arccos(0.5 ** (1 / (2 * self.kappa)))
[docs]
class ODF(object):
"""
Estimate an Orientation Distribution Function (ODF) from a set of orientations
using kernel density estimation.
Parameters
----------
orientations : orix.quaternion.Orientation
Input orientation set.
halfwidth : float, optional
Halfwidth of the kernel in radians (default: 10 degrees).
weights : array-like, optional
Weights for each orientation. If None, uniform weights are used.
kernel : Kernel instance, optional
Kernel function to use. If None, DeLaValleePoussinKernel is used.
exact : bool, optional
If False and orientation count > 1000, approximation using grid is applied.
Attributes
----------
orientations : orix.quaternion.Orientation
Orientation set stored in the ODF.
weights : ndarray
Normalized weights of the orientations.
kernel : Kernel
Kernel used for density estimation.
halfwidth : float
Halfwidth of the kernel in radians.
"""
def __init__(self, orientations, halfwidth=np.radians(10), weights=None, kernel=None, exact=False):
if orientations.size == 0:
raise ValueError("Orientation set is empty.")
if weights is None:
weights = np.ones(orientations.size) / orientations.size
# Set up kernel
if kernel is None:
kernel = DeLaValleePoussinKernel(halfwidth=halfwidth)
hw = kernel.halfwidth()
# Gridify if too many orientations and not exact
if orientations.size > 1000 and not exact:
# Placeholder: replace with proper gridify function if needed
# Currently using simple thinning and weighting
step = max(1, orientations.size // 1000)
orientations = orientations[::step]
weights = weights[::step]
weights = weights / np.sum(weights)
self.orientations = orientations
self.weights = weights
self.kernel = kernel
self.halfwidth = hw
[docs]
def evaluate(self, ori):
"""
Evaluate the ODF at given orientations.
Parameters
----------
ori : Orientation
Orientation(s) at which to evaluate the ODF.
Returns
-------
values : ndarray
ODF values at the specified orientations.
"""
values = np.zeros(ori.size)
for o in self.orientations:
values += self.kernel.K_orientations(o, ori)
return values
[docs]
class EBSDmap:
"""
Class to store attributes and methods to import EBSD maps
and filter out their statistical data needed to generate
synthetic RVEs
Parameters
----------
fname : str
Filename including path to EBSD file.
gs_min : float, optional
Minimum grain size in pixels. Grains smaller than this are disregarded
for statistical analysis. Default is 10.0.
vf_min : float, optional
Minimum volume fraction. Phases with smaller values are disregarded.
Default is 0.03.
max_angle : float, optional
Maximum misorientation angle (degrees) used for grain merging.
Default is 5.0.
connectivity : int, optional
Connectivity for grain identification. Default is 8.
show_plot : bool, optional
If True, plots microstructure maps. Default is True.
show_hist : bool, optional
If True, plots histograms of grain statistics. Default follows `show_plot`.
felzenszwalb : bool, optional
If True, applies Felzenszwalb segmentation. Default is False.
show_grains : bool, optional
If True, plots grain labeling. Default is False.
hist : bool, optional
Deprecated. Use `show_hist` instead.
plot : bool, optional
Deprecated. Use `show_plot` instead.
Attributes
----------
emap : object
Loaded EBSD map object.
sh_x, sh_y : int
Shape of EBSD map in pixels.
dx, dy : float
Pixel size in microns.
npx : int
Total number of pixels in the map.
ms_data : list of dict
Phase-specific microstructure information including:
- name : str — phase name
- vf : float — volume fraction
- index : int — phase index
- ori : Orientation — grain orientations
- cs : crystal symmetry / Laue group
- mo_map : ndarray — misorientation field
- rgb_im : ndarray — IPF color image
- ngrains : int — number of grains
- gs_param, ar_param, om_param : ndarray — grain size, aspect ratio, main axis angle parameters
- gs_data, ar_data, om_data : ndarray — raw distributions
- gs_moments, ar_moments, om_moments : list — distribution moments
- graph : object — microstructure graph
ci_map : ndarray
Optional confidence index map if available.
ngrains : int
Total number of grains after merging/pruning.
"""
def __init__(self, fname, gs_min=10.0, vf_min=0.03, max_angle=5.0, connectivity=8,
show_plot=True, show_hist=None, felzenszwalb=False, show_grains=False,
hist=None, plot=None):
def _reassign(pix, ori, phid, bads):
"""
Reassign the orientation of a pixel based on neighboring pixels of the same phase.
Parameters
----------
pix : int
Index of the pixel to be reassigned.
ori : ndarray
Array of orientations for all pixels.
phid : ndarray
Array of phase IDs for all pixels.
bads : set
Set of indices corresponding to pixels that could not be reassigned previously.
Returns
-------
None
Modifies `ori` in-place. If no valid neighbor is found, `pix` is added to `bads`.
"""
phase = phid[pix]
ix = -1
if pix - 1 >= 0 and phid[pix - 1] == phase and pix - 1 not in bads:
ix = pix - 1
elif pix + 1 < self.npx and phid[pix + 1] == phase and pix + 1 not in bads:
ix = pix + 1
elif pix - self.sh_x >= 0 and phid[pix - self.sh_x - 1] == phase and pix - self.sh_x not in bads:
pix = pix - self.sh_x
elif pix + self.sh_x < self.npx and phid[pix + self.sh_x + 1] == phase and pix + self.sh_x not in bads:
ix = pix + self.sh_x
if ix >= 0:
ori[pix] = ori[ix] # pixel orientation is reassigned
else:
bads.add(pix) # no valid neighbor found add pix again to list
# interpret parameters
if plot is not None:
show_plot = plot
logging.warning('Use of "plot" is depracted, use argument "show_plot" instead.')
if hist is not None:
show_hist = hist
logging.warning('Use of "hist" is depracted, use argument "show_plot" instead.')
if show_hist is None:
show_hist = show_plot
max_angle *= np.pi / 180
self.ms_data = []
# read EBSD map and return the orix object
self.emap = io.load(fname)
self.sh_x, self.sh_y = self.emap.shape # shape of EBSD map in pixels
self.dx, self.dy = self.emap.dx, self.emap.dy # distance in micron b/w pixels
self.npx = self.emap.size # total number of pixels in EBSD map
# determine number of phases and generate histogram
phs = self.emap.phases.ids
Nphase = len(phs) # number of phases
phist = np.histogram(self.emap.phase_id, Nphase)
offs = 0 if 0 in phs else 1 # in CTX maps, there is no phase "0"
print(f'Imported EBSD map with {Nphase} phases.')
if self.sh_x * self.sh_y != self.npx:
#if self.sh_x * self.sh_y != self.npx * 2:
# raise ValueError('Shape mismatch between sh_x and sh_y!')
from matplotlib.collections import PolyCollection
from .ebsd_hex_grid import resample_ebsd_to_rect_grid
hex_map = True
self.npx = self.sh_x * self.sh_y
logging.warning(f"Size of map ({self.npx} px) does not match its shape: {self.sh_x, self.sh_y}"
f"\nAssuming staggered hex grid of EBSD map")
xy = np.stack([self.emap.x, self.emap.y]).T
else:
hex_map = False
ori_q = None
xy = None
resample_ebsd_to_rect_grid = None
# read phase names and calculate volume fractions and plots if active
for n_ph, ind in enumerate(phs):
if ind == -1:
continue
ih = np.searchsorted(phist[1], ind, side='right') - 1
if ind == phist[1][-1]:
ih -= 1
vf = phist[0][ih] / self.emap.size
if vf < vf_min:
continue
data = dict() # initialize data dictionary
data['vf'] = vf
data['name'] = self.emap.phases.names[n_ph]
data['index'] = ind
data['len_x'] = self.sh_x
data['len_y'] = self.sh_y
data['delta_x'] = self.dx
data['delta_y'] = self.dy
# generate phase-specific orientations
if hex_map:
if 'ci' in self.emap.prop.keys():
val_ci = self.emap.prop['ci']
else:
val_ci = np.zeros(self.npx)
ipf_key = ox_plot.IPFColorKeyTSL(self.emap.phases[ind].point_group.laue)
mask = self.emap.phase_id == ind
ori_e = Orientation.from_euler(self.emap[mask].orientations.in_euler_fundamental_region())
ori_q = np.stack([ori_e.a, ori_e.b, ori_e.c, ori_e.d]).T
#rgb_val = ipf_key.orientation2color(ori_e)
xg, yg, phase_g, ci_g, quat_g = \
resample_ebsd_to_rect_grid(xy[mask],
self.emap.phase_id[mask],
ori_q,
val_ci[mask],
nx=self.sh_y, ny=self.sh_x)
# get Euler Angles
q_flat = quat_g.reshape(-1, 4)
# remove NaNs
mask = ~np.isnan(q_flat[:, 0])
quat_g = q_flat[mask]
ori_e = Orientation(quat_g).in_euler_fundamental_region()
pid = phase_g.ravel()
else:
pid = self.emap.phase_id
ori_e = self.emap[pid == ind].orientations.in_euler_fundamental_region()
ci_g = None
quat_g = None
data['ori'] = Orientation.from_euler(ori_e)
data['cs'] = self.emap.phases[ind].point_group.laue
# assign bad pixels to one neighbor
# identify non-indexed pixels and pixels with low confidence index (CI)
if 'ci' in self.emap.prop.keys():
if hex_map:
val = ci_g.ravel()
else:
val = self.emap.prop['ci']
if len(val) == self.npx:
self.ci_map = val
else:
self.ci_map = np.zeros(self.npx)
self.ci_map[pid == 0] = val
bad_pix = set(np.nonzero(val < 0.1)[0])
else:
bad_pix = set()
if len(bad_pix) > 0:
niter = 0
print(f'Initial number of bad pixels: {len(bad_pix)}')
nbad = len(bad_pix)
while len(bad_pix) > 0 and niter < 2 * nbad:
bp = bad_pix.pop()
_reassign(bp, data['ori'], pid, bad_pix)
niter += 1
print(f'After {niter} loops: number of bad pixels: {len(bad_pix)}')
# calculate misorientation field
val = data['ori'].angle
if len(val) == self.npx:
bmap = val
else:
bmap = np.zeros(self.npx)
bmap[pid == ind] = val
data['mo_map'] = bmap
# Get IPF colors
ipf_key = ox_plot.IPFColorKeyTSL(data['cs'])
if data['ori'].size == self.npx:
rgb_val = ipf_key.orientation2color(data['ori'])
else:
rgb_val = np.zeros((self.npx, 3))
rgb_val[pid == ind, :] = ipf_key.orientation2color(data['ori'])
data['rgb_im'] = np.reshape(rgb_val, (self.sh_x, self.sh_y, 3))
# generate map with grain labels
print('Identifying regions of homogeneous misorientations and assigning them to grains.')
labels, n_regions = find_similar_regions(bmap.reshape((self.sh_x, self.sh_y)),
tolerance=max_angle, connectivity=connectivity)
print(f"Phase #{data['index']} ({data['name']}): Identified Grains: {n_regions}")
# build and visualize graph of unfiltered map
print('Building microstructure graph.')
if hex_map:
rot = quat_g
else:
rot = self.emap.rotations.data
ms_graph = build_graph_from_labeled_pixels(labels, rot, data['cs'], self.dx, self.dy)
ms_graph.name = 'Graph of microstructure'
print('Starting to simplify microstructure graph.')
# graph pruning step 1: remove grains that have no convex hull
# and merge regions with similar misorientation
grain_set = set(ms_graph.nodes)
rem_grains = len(grain_set)
while rem_grains > 0:
num = grain_set.pop() # get random ID of grain and remove it from the list
nd = ms_graph.nodes[num] # node to be considered
rem_grains = len(grain_set)
pts = np.array(np.unravel_index(nd['pixels'], (self.sh_x, self.sh_y)), dtype=float).T
pts[:, 0] *= self.dx # convert pixel distances to micron
pts[:, 1] *= self.dy
try:
hull = ConvexHull(pts)
ms_graph.nodes[num]['hull'] = hull
except Exception as e:
# grain has no convex hull
num_ln = find_largest_neighbor(ms_graph, num)
merge_nodes(ms_graph, num, num_ln)
continue
# search for neighbors with similar orientation
sim_neigh, ori_neigh = find_sim_neighbor(ms_graph, num)
if ori_neigh <= max_angle:
merge_nodes(ms_graph, num, sim_neigh)
self.ngrains = len(ms_graph.nodes)
print(f'After merging of similar regions, {self.ngrains} grains left.')
# graph pruning step 2: merge small grains into their largest neighbor grain
grain_set = set(ms_graph.nodes)
rem_grains = len(grain_set)
while rem_grains > 0:
num = grain_set.pop() # get random ID of grain and remove it from the list
rem_grains = len(grain_set)
if ms_graph.nodes[num]['npix'] < gs_min:
num_ln = find_largest_neighbor(ms_graph, num)
merge_nodes(ms_graph, num, num_ln)
self.ngrains = len(ms_graph.nodes)
data['ngrains'] = self.ngrains
print(f'After elimination of small grains, {self.ngrains} grains left.')
# Extract grain statistics and axes
arr_a = []
arr_b = []
arr_eqd = []
arr_om = []
for num, node in ms_graph.nodes.items():
hull = node['hull']
eqd = 2.0 * (hull.volume / np.pi) ** 0.5 # area of convex hull approximates grain better than pixels
pts = hull.points[hull.vertices] # outer nodes of grain
# analyze geometry of point cloud
ea, eb, va, vb = get_grain_geom(pts, method='ellipsis', two_dim=True) # std: 'ellipsis', failsafe: 'raw'
# assert ea >= eb
if eb < 0.01 * ea:
logging.warning(f'Grain {num} has too high aspect ratio: main axes: {ea}, {eb}')
eb = 0.01 * ea
sc_fct = eqd / np.sqrt(ea**2 + eb**2) # rescale axes of ellipsis to ensure consistency with eqd
ea *= sc_fct
eb *= sc_fct
# assert np.dot(va, vb) < 1.e-9
# assert np.isclose(np.linalg.norm(va), 1.0)
omega = np.arccos(va[0]) # angle of major axis against y-axis of map in range [0, pi]
node['max_dia'] = ea
node['min_dia'] = eb
node['equ_dia'] = eqd
node['maj_ax'] = va
node['min_ax'] = vb
node['omega'] = omega
node['center'] = hull.points.mean(axis=0)
arr_a.append(ea)
arr_b.append(eb)
arr_eqd.append(eqd)
arr_om.append(omega)
arr_om = np.array(arr_om)
arr_a = np.array(arr_a)
arr_b = np.array(arr_b)
arr_eqd = np.array(arr_eqd)
print('\n--------------------------------------------------------')
print('Statistical microstructure parameters in pixel map ')
print('--------------------------------------------------------')
print(np.median(arr_a), np.std(arr_a))
print(np.median(arr_b), np.std(arr_b))
print(np.median(arr_eqd), np.std(arr_eqd))
# calculate equivalent diameters
doffs = 0.
deq_log = np.log(arr_eqd)
dscale = np.exp(np.median(deq_log))
dsig = np.std(deq_log)
data['gs_param'] = np.array([dsig, doffs, dscale])
data['gs_data'] = arr_eqd
data['gs_moments'] = [dscale, dsig]
if show_hist:
fig, ax = plt.subplots()
hc, hb = np.histogram(arr_eqd, bins=20)
x0 = hb[0]
ind = np.nonzero(hc == 1)[0] # find first bin with count == 1
if len(ind) > 0:
x1 = hb[ind[0]]
else:
x1 = max(arr_eqd)
x = np.linspace(x0, x1, 150)
y = lognorm.pdf(x, dsig, loc=doffs, scale=dscale)
ax.plot(x, y, '-r', label='lognorm fit')
ax.hist(arr_eqd, bins=20, range=(x0, x1), density=True, label='data')
plt.legend()
plt.title('Histogram of grain equivalent diameters')
plt.xlabel('Equivalent diameter (micron)')
plt.ylabel('Normalized frequency')
plt.show()
# grain aspect ratio
asp = arr_a / arr_b
aoffs = 0.
asp_log = np.log(asp)
ascale = np.exp(np.median(asp_log)) # lognorm.median(asig, loc=aoffs, scale=ascale)
asig = np.std(asp_log) # lognorm.std(asig, loc=aoffs, scale=ascale)
data['ar_param'] = np.array([asig, aoffs, ascale])
data['ar_data'] = asp
data['ar_moments'] = [ascale, asig]
if show_hist:
# plot distribution of aspect ratios
fig, ax = plt.subplots()
hc, hb = np.histogram(asp, bins=20)
x0 = hb[0]
ind = np.nonzero(hc == 1)[0] # find first bin with count == 1
if len(ind) > 0:
x1 = hb[ind[0]]
else:
x1 = max(asp)
x = np.linspace(x0, x1, 150)
y = lognorm.pdf(x, asig, loc=aoffs, scale=ascale)
ax.plot(x, y, '-r', label='lognorm fit')
ax.hist(asp, bins=20, range = (x0, x1), density=True, label='data')
plt.legend()
plt.title('Histogram of grain aspect ratio')
plt.xlabel('aspect ratio')
plt.ylabel('normalized frequency')
plt.show()
# angles of main axis
# fit von Mises distribution (circular normal distribution) to data
kappa, oloc, oscale = vonmises.fit(2*arr_om - np.pi)
med_om = vonmises.median(kappa, loc=oloc) # scale parameter has no effect on vonmises distribution
std_om = vonmises.std(kappa, loc=oloc)
data['om_param'] = np.array([kappa, oloc])
data['om_data'] = arr_om
data['om_moments'] = [med_om, std_om]
if show_hist:
fig, ax = plt.subplots()
x = np.linspace(-np.pi, np.pi, 200) # np.amin(omega), np.amax(omega), 150)
y = vonmises.pdf(x, kappa, loc=oloc)
ax.plot(0.5*(x+np.pi), 2 * y, '-r', label='von Mises fit')
ax.hist(arr_om, bins=40, density=True, label='data')
plt.legend()
plt.title('Histogram of tilt angles of major axes')
plt.xlabel('angle (rad)')
plt.ylabel('normalized frequency')
plt.show()
print(f"Analyzed microstructure of phase #{data['index']} ({data['name']}) with {self.ngrains} grains.")
print(f'Median values: equiv. diameter: {dscale.round(3)} micron, ' +
f'aspect ratio: {ascale.round(3)}, ' +
f'tilt angle: {(med_om * 180 / np.pi).round(3)}°')
print(f'Std. dev: equivalent diameter: {dsig.round(3)} micron, ' +
f'aspect ratio: {asig.round(3)}, ' +
f'tilt angle: {(std_om * 180 / np.pi).round(3)}°')
data['graph'] = ms_graph
self.ms_data.append(data)
if show_plot:
self.plot_mo_map()
self.plot_ipf_map()
self.plot_segmentation()
self.plot_pf()
if show_grains:
self.plot_grains()
if felzenszwalb:
self.plot_felsenszwalb()
return
[docs]
def calcORI(self, Ng, iphase=0, shared_area=None, nbins=12,
res_low=5, res_high=25, res_step=2, lim=5, hw_init=None,
verbose=False, full_output=False):
"""
Estimate optimum kernel half-width and produce reduced set of
orientations for given number of grains
Parameters
----------
Ng : int
Numbr of grains for which orientation is requested.
iphase : int, optional
Phase for which data is requested. The default is 0.
shared_area : array, optional
Grain boundary data. The default is None.
nbins : int, optional
number of bins for GB texture. The default is 12.
Returns
-------
ori : (Ng, 3)-array
Array with Ng Euler angles.
"""
ms = self.ms_data[iphase]
orired, odfred, ero, res = texture_reconstruction(Ng, orientations=ms['ori'],
res_low=res_low, res_high=res_high,
res_step=res_step, lim=lim, hw_init=hw_init,
verbose=verbose)
if shared_area is None:
if full_output:
return orired, odfred, ero, res
else:
return orired
else:
raise ModuleNotFoundError('Shared area is not implemented yet in pure Python version.\n'
'Use kanapy-mtex for this option.')
#orilist, ein, eout, mbin = \
# self.eng.gb_textureReconstruction(ms['grains'], orired,
# matlab.double(shared_area), nbins, nargout=4)
#return np.array(self.eng.Euler(orilist))
[docs]
def showIPF(self):
"""
Plot the inverse pole figure (IPF) color key for all phases in the EBSD map
"""
for i in self.emap.phases.ids:
pg = self.emap.phases[i].point_group.laue
fig = plt.figure(figsize=(8, 8))
ax0 = fig.add_subplot(111, projection="ipf", symmetry=pg, zorder=2)
ax0.plot_ipf_color_key(show_title=False)
ax0.patch.set_facecolor("None")
plt.show()
[docs]
def plot_ci_map(self):
"""
Plot the confidence index (CI) map of the EBSD data if available
"""
if 'ci' in self.emap.prop.keys():
plt.imshow(self.ci_map.reshape((self.sh_x, self.sh_y)))
plt.title('CI values in EBSD map')
plt.colorbar(label="CI")
plt.show()
[docs]
def plot_pf(self, vector=None):
"""
Plot pole figures for all phases using the specified sample direction
Parameters
----------
vector : array-like, optional
Sample reference vector for the pole figure. Default is [0, 0, 1].
Notes
-----
Plots <111> poles in the sample reference frame for each phase.
"""
# plot inverse pole figure
# <111> poles in the sample reference frame
if vector is None:
vector = [0, 0, 1]
pids = np.array(self.emap.phases.ids)
pids = pids[pids >= 0]
for n_ph, data in enumerate(self.ms_data):
orientations = data['ori']
plot_pole_figure(orientations, self.emap.phases[pids[n_ph]], vector=vector)
#t_ = Miller(uvw=vector, phase=self.emap.phases[ind]).symmetrise(unique=True)
#t_all = orientations.inv().outer(t_)
#fig = plt.figure(figsize=(8, 8))
#ax = fig.add_subplot(111, projection="stereographic")
#ax.scatter(t_all)
#ax.set_labels("X", "Y", None)
#ax.set_title(data['name'] + r" $\left<001\right>$ PF")
#plt.show()
[docs]
def plot_pf_proj(self, vector=None):
"""
Plot projected pole figures for all phases using the specified sample direction
Parameters
----------
vector : array-like, optional
Sample reference vector for the projected pole figure. Default is [0, 0, 1].
Notes
-----
Uses a projection method to visualize <111> poles in the sample reference frame.
"""
# plot inverse pole figure
# <111> poles in the sample reference frame
if vector is None:
vector = [0, 0, 1]
pids = np.array(self.emap.phases.ids)
pids = pids[pids >= 0]
for n_ph, data in enumerate(self.ms_data):
orientations = data['ori']
plot_pole_figure_proj(orientations, self.emap.phases[pids[n_ph]], vector=vector)
[docs]
def plot_grains_marked(self):
"""
Plot grains with labels and major/minor axes for all phases
Notes
-----
Each grain is annotated with its number. Major axes are plotted in black,
minor axes in red. The plot uses a distinct colormap for different grains.
"""
# plot grain with numbers and axes
for data in self.ms_data:
ngr = data['ngrains']
cols = get_distinct_colormap(ngr)
cmap = LinearSegmentedColormap.from_list('segs', cols, N=ngr)
plt.imshow(data['graph'].graph['label_map'] / ngr, cmap=cmap)
for num, node in data['graph'].nodes.items():
ctr = data['graph'].nodes[num]['center']
plt.annotate(str(num), xy=(ctr[1], ctr[0]))
pts = np.zeros((4, 2))
pts[0, :] = node['center'] - node['max_dia'] * node['maj_ax']
pts[1, :] = node['center'] + node['max_dia'] * node['maj_ax']
pts[2, :] = node['center'] - node['min_dia'] * node['min_ax']
pts[3, :] = node['center'] + node['min_dia'] * node['min_ax']
pts[:, 0] /= self.dx
pts[:, 1] /= self.dy
plt.plot(pts[0:2, 1], pts[0:2, 0], color='k')
plt.plot(pts[2:4, 1], pts[2:4, 0], color='red')
plt.title(f"Phase #{data['index']} ({data['name']}): Grain labels and axes: {ngr}")
plt.colorbar(label='Grain Number')
plt.show()
[docs]
def plot_mo_map(self):
"""
Plot the misorientation map for all phases
Notes
-----
Shows the misorientation angle of each pixel with respect to a reference orientation.
"""
for data in self.ms_data:
plt.imshow(data['mo_map'].reshape((self.sh_x, self.sh_y)))
plt.title(f"Phase #{data['index']} ({data['name']}): Misorientation angle wrt reference")
plt.colorbar(label="Misorientation (rad)")
plt.show()
[docs]
def plot_segmentation(self, show_mo=True, show_ipf=True):
"""
Plot segmentation results for all phases
Parameters
----------
show_mo : bool, optional
If True, overlay grain boundaries on the misorientation map. Default is True.
show_ipf : bool, optional
If True, overlay grain boundaries on the IPF map. Default is True.
Notes
-----
Uses similarity-based segmentation to highlight grain boundaries on the maps.
"""
# plot segmentation results
for data in self.ms_data:
if show_mo:
gscale_map = data['mo_map'].reshape((self.sh_x, self.sh_y)) / np.pi
plt.imshow(mark_boundaries(gscale_map, data['graph'].graph['label_map']))
plt.title(f"Phase #{data['index']} ({data['name']}): Misorientation map with similarity segmentation")
plt.show()
if show_ipf:
plt.imshow(mark_boundaries(data['rgb_im'], data['graph'].graph['label_map']))
plt.title(f"Phase #{data['index']} ({data['name']}): IPF map with similarity segmentation")
plt.show()
[docs]
def plot_felsenszwalb(self):
"""
Apply Felzenszwalb segmentation to the misorientation map and plot the results
Notes
-----
Segments the misorientation map using the Felzenszwalb algorithm and overlays
the segment boundaries for visualization.
"""
from skimage.segmentation import felzenszwalb
for data in self.ms_data:
gscale_map = data['mo_map'].reshape((self.sh_x, self.sh_y)) / np.pi
labels_fz = felzenszwalb(gscale_map, scale=300, sigma=0.6, min_size=25) # sc=300, sig=0.8, min_s=25
plt.imshow(mark_boundaries(gscale_map, labels_fz))
plt.title(f"Phase #{data['index']} ({data['name']}): Misorientation map with Felzenszwalb segmentation")
plt.show()
[docs]
def plot_graph(self):
"""
Visualize the microstructure graph for all phases
Notes
-----
Uses the `visualize_graph` function to display nodes and connections of each phase's graph.
"""
# visualize graph
for data in self.ms_data:
visualize_graph(data['graph'])
[docs]
def plot_ipf_map(self):
"""
Plot the IPF (inverse pole figure) map for all phases
Notes
-----
Pixels belonging to other phases are set to black. Each phase is visualized separately.
"""
# plot EBSD maps for all phase
# set pixels of other phases to black
for data in self.ms_data:
fig = plt.figure(figsize=(12, 8))
plt.imshow(data['rgb_im'].reshape((self.sh_x, self.sh_y, 3)))
"""fig = self.emap.plot(
data['rgb_im'].reshape(self.npx, 3),
return_figure=True,
figure_kwargs={"figsize": (12, 8)},
)"""
plt.show()
[docs]
def plot_grains(self, N=5):
"""
Plot the N largest grains with one-hot maps, convex hulls, and principal axes
Parameters
----------
N : int, optional
Number of largest grains to plot for each phase. Default is 5.
Notes
-----
Each grain is visualized with its pixels, convex hull, and major/minor axes.
Major axes are plotted in cyan, minor axes in green.
"""
# select N largest gains and create a one-hot-plot for each grain
# showing its pixels, the vertices of the convex hull and the convex hull itself
# together with the principal axes of the grain
for data in self.ms_data:
glist = [0]
slist = [0]
for num, node in data['graph'].nodes.items():
hs = node['equ_dia']
if hs > min(slist):
glist.append(num)
slist.append(hs)
if len(glist) > N:
i = np.argmin(slist)
glist.pop(i)
slist.pop(i)
for num in glist:
pix_c = np.unravel_index(data['graph'].nodes[num]['pixels'], (self.sh_x, self.sh_y))
oh_grain = np.zeros((self.sh_x, self.sh_y))
oh_grain[pix_c] = 0.3
node = data['graph'].nodes[num]
ind = node['hull'].vertices
oh_grain[pix_c[0][ind], pix_c[1][ind]] = 1
plt.imshow(oh_grain, cmap='gray')
# add convex hull
for i in range(len(ind) - 1):
ix = [ind[i], ind[i + 1]]
plt.plot(pix_c[1][ix], pix_c[0][ix], color='y')
ix = [ind[i + 1], ind[0]]
plt.plot(pix_c[1][ix], pix_c[0][ix], color='y')
# add axes
ctr = node['center']
pts = np.zeros((4, 2))
pts[0, :] = ctr - node['max_dia'] * node['maj_ax']
pts[1, :] = ctr + node['max_dia'] * node['maj_ax']
pts[2, :] = ctr - node['min_dia'] * node['min_ax']
pts[3, :] = ctr + node['min_dia'] * node['min_ax']
pts[:, 0] /= self.dx
pts[:, 1] /= self.dy
plt.plot(pts[0:2, 1], pts[0:2, 0], color='cyan')
plt.plot(pts[2:4, 1], pts[2:4, 0], color='green')
plt.title(f"Phase #{data['index']} ({data['name']}): Grain #{num}")
plt.show()
[docs]
def get_ipf_colors(ori_list, color_key=0):
"""
Get RGB colors for a list of orientations in radians
Parameters
----------
ori_list : (N, 3) ndarray
List of N Euler angles in radians
color_key : int, optional
Index of the IPF color key to use. Default is 0
Returns
-------
colors : (N, 3) ndarray
RGB values corresponding to each orientation
Notes
-----
Assumes cubic crystal symmetry and cubic specimen symmetry
"""
# get colors
if not isinstance(ori_list, Orientation):
ori_list = Orientation.from_euler(ori_list)
ckey = ox_plot.IPFColorKeyTSL(ori_list.symmetry)
ocol = ckey.orientation2color(ori_list)
return ocol
[docs]
def createOriset(num, ang, omega, hist=None, shared_area=None,
cs=None, degree=True, Nbase=10000, resolution=None,
res_low=5, res_high=25, res_step=2, lim=5, hw_init=None,
verbose=False, full_output=False):
"""
Create a set of Euler angles according to an ODF defined by input orientations and kernel half-width
Parameters
----------
num : int
Number of Euler angles to generate
ang : (3,) or (M, 3) array or Orientation
Input set of Euler angles (in degrees or radians) defining the ODF
omega : float
Kernel half-width (in degrees or radians)
hist : array, optional
Histogram of MDF. Default is None
shared_area : array, optional
Shared area between pairs of grains. Default is None
cs : Symmetry, optional
Crystal symmetry group. Default is 'm3m'
degree : bool, optional
If True, input angles and omega are in degrees. Default is True
Nbase : int, optional
Base number of orientations for artificial ODF. Default is 10000
resolution : float, optional
Resolution for orientation generation. If None, derived from omega
res_low, res_high, res_step, lim : int, optional
Parameters for texture reconstruction
verbose : bool, optional
If True, prints progress messages. Default is False
full_output : bool, optional
If True, returns additional reconstruction outputs. Default is False
Returns
-------
ori : (num, 3) ndarray
Reduced set of Euler angles
Notes
-----
Can generate artificial ODFs if input set is small. Grain-boundary-texture
reconstruction with hist or shared_area requires the kanapy-mtex module.
"""
# prepare parameters
if hist is not None or shared_area is not None:
raise ModuleNotFoundError('The grain-boundary-texture module is currently only available in kanapy-mtex.')
if cs is None or not isinstance(cs, Symmetry):
logging.warning('Crystal Symmetry "cs" must be provided as Symmetry object.')
print(type(cs))
if resolution is None:
if degree:
resolution = omega
else:
resolution = np.rad2deg(omega) # resolution must be given in degrees
if degree:
omega = np.deg2rad(omega)
if isinstance(ang, list):
if degree:
ang = np.deg2rad(ang)
else:
ang = np.array(ang)
ang = Orientation.from_euler(ang, cs)
else:
assert isinstance(ang, Orientation)
# psi = DeLaValleePoussinKernel(halfwidth=omega)
if ang.size < Nbase/100:
# create artificial ODF for monomodal texture or small orientation set
if verbose:
logging.info(f'Creating artificial ODF centered around orientation {ang} with '
f'kernel halfwidth: {omega}')
odf = ODF(ang, halfwidth=omega)
if verbose:
logging.info(f'Creating {Nbase} orientation from artificial ODF.')
ori = calc_orientations(odf, Nbase, res=resolution)
assert ori.size == Nbase
else:
ori = ang
logging.info(f'Texture reconstruction generating {num} orientations from {ori.size} inputs.'
f'Kernel halfwidth: {omega}')
ori_red, odfred, ero, res = texture_reconstruction(num, orientations=ori,
kernel_halfwidth=omega,
res_low=res_low, res_high=res_high,
res_step=res_step, lim=lim, hw_init=hw_init,
verbose=verbose)
if hist is None:
if full_output:
return ori_red, odfred, ero, res
else:
return ori_red
else:
pass
#if shared_area is None:
# raise ValueError('Microstructure.shared_area must be provided if hist is given.')
#orilist, ein, eout, mbin = \
# eng.gb_textureReconstruction(matlab.double(hist), ori,
# matlab.double(shared_area), len(hist), nargout=4)
#return np.array(eng.Euler(orilist))
[docs]
def createOrisetRandom(num, omega=None, hist=None, shared_area=None, cs=None, Nbase=None):
"""
Create a set of Euler angles for a random texture
Parameters
----------
num : int
Number of Euler angles to generate
omega : float, optional
Kernel half-width in degrees. Default is 7.5
hist : array, optional
Histogram of MDF. Default is None
shared_area : array, optional
Shared area between pairs of grains. Default is None
cs : Symmetry, optional
Crystal symmetry group. Default is 'm3m'
Nbase : int, optional
Base number of orientations for random texture. Default is 5000
Returns
-------
ori : (num, 3) ndarray
Set of randomly distributed Euler angles
Notes
-----
Unlike `createOriset`, this function directly generates `num` random orientations
without reducing a larger artificial EBSD set. Grain-boundary-texture reconstruction
using `hist` or `shared_area` requires the kanapy-mtex module.
"""
if hist is not None or shared_area is not None:
raise ModuleNotFoundError('The grain-boundary-texture module is currently only available in kanapy-mtex.')
if cs is None or not isinstance(cs, Symmetry):
logging.warning('Crystal Symmetry "cs" must be provided as Symmetry object.')
print(type(cs))
ori = Orientation.random(num, cs).in_euler_fundamental_region()
if hist is None:
return ori
else:
pass
# ori = eng.project2FundamentalRegion(ori)
# if shared_area is None:
# raise ValueError('Shared grain boundary area (geometry["GBarea"]) must be provided if hist is given.')
# orilist, ein, eout, mbin = \
# eng.gb_textureReconstruction(matlab.double(hist), ori,
# matlab.double(shared_area), len(hist), nargout=4)
# return np.array(eng.Euler(orilist))
[docs]
def segment_microstructure(
json_path: str,
*,
th_cut_deg: float = 5.0,
th_merge_deg: float = 7.5,
min_voxels: int = 4,
print_topk_rag: int = 10,
tiny_cutoff: int = 5,
verbose: bool = True,
log_path: str | None = None,
progress: bool = False,
) -> None:
"""
Segment all snapshots in a microstructure JSON in-place.
- Snapshot 0 is assumed already segmented and is kept as reference for phase_id mapping.
- Snapshots 1..last are re-segmented from voxel orientations using:
Step 1: 6-neighbor voxel graph + ORIX misorientation on edges
Step 2: threshold cut (<= th_cut_deg) + connected components
Step 3: segment-RAG boundary stats + ORIX mean orientation per segment
Step 4: merge/prune via union-find:
Rule A: merge adjacent segments if mean-orientation miso <= th_merge_deg
Rule B: absorb tiny components (<= min_voxels) into best neighbor using root-mean miso
Step 5: diagnostics (post-merge boundary stats)
Step 6/7: write voxel grain_id + rebuild grains[] with ORIX mean orientation
This function overwrites `json_path` (no new file).
Parameters
----------
json_path : str
Path to the JSON file to update in-place.
th_cut_deg : float
Edge cut threshold in degrees (keep edges <= th_cut_deg).
th_merge_deg : float
Merge threshold in degrees for Rule A (mean-orientation miso).
min_voxels : int
Tiny component cutoff (<= min_voxels) for Rule B.
print_topk_rag : int
Number of highest-contact boundaries printed for diagnostics.
tiny_cutoff : int
Diagnostics: count segments with <= tiny_cutoff voxels after Step 2.
verbose : bool
Print progress and diagnostics.
Returns
-------
None
Updates the JSON file in-place.
"""
import warnings
warnings.filterwarnings(
"ignore",
category=np.exceptions.ComplexWarning,
module=r"quaternion(\.|$)",
)
# optional tqdm import only when needed (avoids hard dependency)
pbar = None
if progress:
try:
from tqdm.auto import tqdm
except Exception as e:
raise ImportError(
"progress=True requires tqdm. Install it via `pip install tqdm` or disable progress."
) from e
# open Log file if requested (append mode) and route _p() to it
log_fh = None
if log_path is not None:
log_fh = open(log_path, "a", encoding="utf-8")
def _p(*args):
if not verbose:
return
if log_fh is not None:
print(*args, file=log_fh, flush=True)
else:
print(*args)
try:
# -----------------------------
# helpers: union-find
# -----------------------------
def _uf_find(parent: np.ndarray, x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def _uf_union(parent: np.ndarray, rank: np.ndarray, a: int, b: int) -> bool:
ra, rb = _uf_find(parent, a), _uf_find(parent, b)
if ra == rb:
return False
if rank[ra] < rank[rb]:
parent[ra] = rb
elif rank[ra] > rank[rb]:
parent[rb] = ra
else:
parent[rb] = ra
rank[ra] += 1
return True
# -----------------------------
# Load JSON
# -----------------------------
_p("=== LOAD JSON ===")
with open(json_path, "r") as f:
data = json.load(f)
if "microstructure" not in data:
raise KeyError("Top-level key 'microstructure' not found.")
micro = data["microstructure"]
n_snaps = len(micro)
if n_snaps < 2:
_p("Nothing to do: only one snapshot found.")
return
# Snapshot 0 reference phase mapping
snap0 = micro[0]
if "grains" not in snap0:
raise KeyError("Snapshot 0 must contain 'grains' for phase mapping.")
grainid_to_phase = {int(g["grain_id"]): int(g.get("phase_id", 0)) for g in snap0["grains"]}
_p("File:", json_path)
_p("Number of snapshots:", n_snaps)
_p("Will update snapshots:", list(range(1, n_snaps)))
# -----------------------------
# Process each snapshot INC = 1..last
# -----------------------------
# optionally wrap iteration with tqdm progress bar
it = range(1, n_snaps)
if progress:
it = tqdm(it, total=max(n_snaps - 1, 0), desc="Segmenting snapshots", leave=True)
for inc in it:
_p("\n" + "=" * 90)
_p(f"=== PROCESS SNAPSHOT {inc} time={micro[inc].get('time', None)} ===")
_p("=" * 90)
snap = micro[inc]
voxels = snap.get("voxels", None)
if voxels is None:
raise KeyError(f"Snapshot {inc} missing 'voxels'.")
N = len(voxels)
if N == 0:
_p(f"Snapshot {inc}: empty voxels; skipping.")
continue
# -----------------------------------------------------------------------------
# STEP 1 — BUILD VOXEL GRAPH (6-neigh) + MISORIENTATION EDGE WEIGHTS (ORIX)
# -----------------------------------------------------------------------------
_p("\n=== STEP 1: VOXEL GRAPH + MISORIENTATION (ORIX) ===")
idx_to_pos = {}
eulers = np.empty((N, 3), dtype=float)
voxel_vol = np.empty(N, dtype=float)
old_gid = np.empty(N, dtype=int)
for pos, v in enumerate(voxels):
ijk = tuple(int(x) for x in v["voxel_index"]) # 1-based indices
idx_to_pos[ijk] = pos
eulers[pos, :] = np.asarray(v["orientation"], dtype=float)
voxel_vol[pos] = float(v["voxel_volume"])
old_gid[pos] = int(v["grain_id"])
edges_pos_u = []
edges_pos_v = []
for (i, j, k), pos in idx_to_pos.items():
nbs = [
(i - 1, j, k), (i + 1, j, k),
(i, j - 1, k), (i, j + 1, k),
(i, j, k - 1), (i, j, k + 1),
]
for nb in nbs:
nb_pos = idx_to_pos.get(nb, None)
if nb_pos is None:
continue
if nb_pos > pos: # store undirected edge once
edges_pos_u.append(pos)
edges_pos_v.append(nb_pos)
edges_pos_u = np.asarray(edges_pos_u, dtype=int)
edges_pos_v = np.asarray(edges_pos_v, dtype=int)
E = len(edges_pos_u)
_p("N voxels:", N)
_p("E edges (6-neigh undirected):", E)
cs = symmetry.Oh # cubic m-3m
O = Orientation.from_euler(eulers, symmetry=cs, direction="lab2crystal", degrees=False)
w_deg = np.asarray(O[edges_pos_u].angle_with(O[edges_pos_v], degrees=True), dtype=float)
_p("Misorientation (deg): min/mean/max =",
float(w_deg.min()), float(w_deg.mean()), float(w_deg.max()))
_p("=== STEP 1 COMPLETE ===")
# -----------------------------------------------------------------------------
# STEP 2 — INITIAL CUT + CONNECTED COMPONENTS => INITIAL SEGMENTS
# -----------------------------------------------------------------------------
_p("\n=== STEP 2: INITIAL CUT SEGMENTATION ===")
_p("TH_CUT (deg):", th_cut_deg)
keep = (w_deg <= th_cut_deg)
u = edges_pos_u[keep]
v = edges_pos_v[keep]
_p("edges kept:", int(keep.sum()), "edges cut:", int((~keep).sum()))
rows = np.concatenate([u, v])
cols = np.concatenate([v, u])
vals = np.ones(rows.shape[0], dtype=np.int8)
A = coo_matrix((vals, (rows, cols)), shape=(N, N))
n_seg, labels = connected_components(A, directed=False, connection="weak")
sizes = np.bincount(labels, minlength=n_seg)
sizes_sorted = np.sort(sizes)[::-1]
_p("n_initial_segments:", int(n_seg))
_p("largest 10 sizes:", sizes_sorted[:10].tolist())
_p("smallest 10 sizes:", sizes_sorted[-10:].tolist())
_p(f"segments with <= {tiny_cutoff} voxels:", int((sizes <= tiny_cutoff).sum()))
_p("=== STEP 2 COMPLETE ===")
# -----------------------------------------------------------------------------
# STEP 3 — BUILD SEGMENT-RAG + MEAN ORIENTATIONS (ORIX)
# -----------------------------------------------------------------------------
_p("\n=== STEP 3: BUILD SEGMENT-RAG ===")
_p("segments:", int(n_seg), "labels min/max:", int(labels.min()), int(labels.max()))
seg_size = sizes.astype(int)
seg_mean_O = [None] * n_seg
for s in range(n_seg):
mask = (labels == s)
seg_mean_O[s] = O[mask].mean()
# Boundary aggregation using voxel edges
rag_contact = {} # (sa,sb)->int
rag_miso_sum = {} # (sa,sb)->float (voxel-edge miso sum)
for i in range(E):
a = int(labels[int(edges_pos_u[i])])
b = int(labels[int(edges_pos_v[i])])
if a == b:
continue
if a > b:
a, b = b, a
key = (a, b)
rag_contact[key] = rag_contact.get(key, 0) + 1
rag_miso_sum[key] = rag_miso_sum.get(key, 0.0) + float(w_deg[i])
rag_miso_mean = {k: rag_miso_sum[k] / rag_contact[k] for k in rag_contact}
_p("segment-RAG edges:", len(rag_contact))
if print_topk_rag > 0:
top_edges = sorted(rag_contact.items(), key=lambda kv: kv[1], reverse=True)[:print_topk_rag]
_p(f"\nTop {print_topk_rag} neighbors by boundary contact count:")
for (sa, sb), cnt in top_edges:
_p(f" seg {sa} -- seg {sb} : contacts={cnt}, boundary_mean_miso_deg={rag_miso_mean[(sa, sb)]:.3f}")
_p("=== STEP 3 COMPLETE ===")
# -----------------------------------------------------------------------------
# STEP 4 — MERGE / PRUNE USING UNION-FIND
# Rule A: merge neighbors if mean-orientation miso <= th_merge_deg
# Rule B: absorb tiny components (<= min_voxels) into best neighbor
# -----------------------------------------------------------------------------
_p("\n=== STEP 4: MERGE / PRUNE SEGMENTS ===")
_p("TH_MERGE (deg):", th_merge_deg, "MIN_VOXELS:", min_voxels)
parent = np.arange(n_seg, dtype=int)
rank = np.zeros(n_seg, dtype=int)
# Rule A (UPDATED: require BOTH mean-orientation and boundary mean miso)
merge_A = 0
for (sa, sb) in rag_contact.keys():
# 1) mean-orientation misorientation (deg)
miso_meanO = float(
np.asarray(seg_mean_O[sa].angle_with(seg_mean_O[sb], degrees=True)).squeeze()
)
# 2) boundary mean misorientation (deg)
boundary_mean_miso = float(rag_miso_mean[(sa, sb)])
# Merge only if BOTH are small
if (miso_meanO <= th_merge_deg) and (boundary_mean_miso <= th_merge_deg):
if _uf_union(parent, rank, sa, sb):
merge_A += 1
_p("Rule A merges performed:", merge_A)
# Build root members after Rule A
root_members = {}
for s in range(n_seg):
r = _uf_find(parent, s)
root_members.setdefault(r, []).append(s)
# Root sizes
root_size = {r: int(sum(seg_size[s] for s in members)) for r, members in root_members.items()}
tiny_roots = [r for r, sz in root_size.items() if sz <= min_voxels]
_p("tiny components after Rule A:", len(tiny_roots), "roots:", tiny_roots)
# Root mean orientation (ORIX) from voxel sets
root_mean_O = {}
for r, members in root_members.items():
mask = np.isin(labels, members)
root_mean_O[r] = O[mask].mean()
# Rule B: absorb each tiny root
merge_B = 0
for r0 in tiny_roots:
r = _uf_find(parent, r0) # update in case previous unions changed it
if r not in root_size:
continue
if root_size[r] > min_voxels:
continue
best = None # (miso_root_deg, -contact, nbr_root)
for (sa, sb), contact in rag_contact.items():
ra = _uf_find(parent, sa)
rb = _uf_find(parent, sb)
if ra == rb:
continue
if ra != r and rb != r:
continue
nbr = rb if ra == r else ra
miso_root = float(np.asarray(root_mean_O[r].angle_with(root_mean_O[nbr], degrees=True)).squeeze())
score = (miso_root, -int(contact), nbr)
if (best is None) or (score < best):
best = score
if best is None:
continue
_, _, nbr_root = best
if _uf_union(parent, rank, r, nbr_root):
merge_B += 1
# Update bookkeeping (sizes + mean orientations) for stability
newr = _uf_find(parent, r)
oldr = nbr_root if newr == r else r # whichever got absorbed
root_size[newr] = root_size.get(r, 0) + root_size.get(nbr_root, 0)
root_size.pop(oldr, None)
# Recompute mean orientation for new root (safe and correct)
# (voxel mask uses ORIGINAL segment labels; rebuild members by scanning is OK at this scale)
members_new = [s for s in range(n_seg) if _uf_find(parent, s) == newr]
mask_new = np.isin(labels, members_new)
root_mean_O[newr] = O[mask_new].mean()
_p("Rule B merges performed:", merge_B)
# Finalize merged voxel labels (compact 0..n_final-1)
root_to_new = {}
labels_merged = np.empty_like(labels)
next_id = 0
for pos in range(N):
s = int(labels[pos])
r = _uf_find(parent, s)
if r not in root_to_new:
root_to_new[r] = next_id
next_id += 1
labels_merged[pos] = root_to_new[r]
n_final = next_id
final_sizes = np.bincount(labels_merged, minlength=n_final)
_p("\nAfter merging:")
_p("n_final_segments:", n_final)
_p("sum(final_sizes) == N:", int(final_sizes.sum()), "==", N)
_p("sorted final sizes:", np.sort(final_sizes)[::-1].tolist())
_p("segments with <= 5 voxels:", int((final_sizes <= 5).sum()))
_p("=== STEP 4 COMPLETE ===")
# -----------------------------------------------------------------------------
# STEP 5 — VALIDATE (post-merge boundary diagnostic)
# -----------------------------------------------------------------------------
_p("\n=== STEP 5: POST-MERGE RAG VALIDATION ===")
rag2_contact = {}
rag2_sum = {}
for i in range(E):
a = int(labels_merged[int(edges_pos_u[i])])
b = int(labels_merged[int(edges_pos_v[i])])
if a == b:
continue
if a > b:
a, b = b, a
key = (a, b)
rag2_contact[key] = rag2_contact.get(key, 0) + 1
rag2_sum[key] = rag2_sum.get(key, 0.0) + float(w_deg[i])
rag2_mean = {k: rag2_sum[k] / rag2_contact[k] for k in rag2_contact}
_p("post-merge RAG edges:", len(rag2_contact))
if print_topk_rag > 0:
top2 = sorted(rag2_contact.items(), key=lambda kv: kv[1], reverse=True)[:print_topk_rag]
_p(f"\nTop {print_topk_rag} post-merge neighbors by contact:")
for (a, b), cnt in top2:
_p(f" seg {a} -- seg {b} : contacts={cnt}, boundary_mean_miso_deg={rag2_mean[(a,b)]:.3f}")
_p("=== STEP 5 COMPLETE ===")
# -----------------------------------------------------------------------------
# STEP 6/7 — UPDATE SNAPSHOT: voxels[*].grain_id + rebuild grains[]
# -----------------------------------------------------------------------------
_p("\n=== STEP 6/7: UPDATE JSON (voxels + grains) ===")
# Write new grain_id (1-based) into voxels
new_gid = labels_merged.astype(int) + 1
for pos in range(N):
voxels[pos]["grain_id"] = int(new_gid[pos])
n_grains_out = int(new_gid.max())
_p("n_final grains:", n_grains_out)
_p("first 10 new grain_id:", new_gid[:10].tolist())
if progress:
try:
it.set_postfix(grains=n_grains_out, refresh=True)
except Exception:
pass
# Rebuild grains[]
new_grains = []
for k in range(1, n_grains_out + 1):
mask = (new_gid == k)
if not np.any(mask):
continue
gv = float(voxel_vol[mask].sum())
Omean = O[mask].mean()
e_mean = np.asarray(Omean.to_euler(), dtype=float).reshape(3)
phases = [grainid_to_phase[int(og)] for og in old_gid[mask] if int(og) in grainid_to_phase]
phase_id = Counter(phases).most_common(1)[0][0] if phases else 0
new_grains.append({
"grain_id": int(k),
"phase_id": int(phase_id),
"grain_volume": gv,
"orientation": [float(e_mean[0]), float(e_mean[1]), float(e_mean[2])],
})
new_grains.sort(key=lambda g: g["grain_id"])
snap["grains"] = new_grains
_p("rebuilt grains[] -> n_grains:", len(new_grains))
_p("total grain_volume:", float(sum(g["grain_volume"] for g in new_grains)))
_p("=== STEP 6/7 COMPLETE ===")
# -----------------------------
# Overwrite the same JSON file
# -----------------------------
_p("\n=== WRITE JSON (IN-PLACE) ===")
with open(json_path, "w") as f:
json.dump(data, f, indent=2)
_p("Updated in-place:", json_path)
finally:
if log_fh is not None:
log_fh.close()
[docs]
def step2_overlap_counts(gid_t: np.ndarray, gid_tp1: np.ndarray, verbose: bool = True, log_fh=None,):
"""
Step 2: Compute overlap counts C(g_t, g_tp1).
Parameters
----------
gid_t, gid_tp1 : np.ndarray, shape (N,)
Aligned grain-id arrays for snapshots t and t+1.
verbose : bool
If True, prints overlap statistics and match summaries.
Returns
-------
C : np.ndarray, shape (Gt+1, Gtp1+1)
Overlap count matrix where C[i,j] = number of voxels shared.
row_sum : np.ndarray
Total voxels per grain in t (row sums).
col_sum : np.ndarray
Total voxels per grain in t+1 (col sums).
top_pairs : list[tuple]
Sorted list of (count, i, j) for the largest overlaps.
"""
def _p(msg: str):
if not verbose:
return
if log_fh is not None:
print(msg, file=log_fh, flush=True)
else:
print(msg)
if gid_t.shape != gid_tp1.shape:
raise ValueError("gid arrays must have same shape")
Gt = int(gid_t.max())
Gp = int(gid_tp1.max())
# Build overlap matrix (IDs assumed positive ints)
C = np.zeros((Gt + 1, Gp + 1), dtype=np.int32)
# Fast accumulation (vectorized via bincount on combined index)
# combined_index = i*(Gp+1) + j
combined = gid_t.astype(np.int64) * (Gp + 1) + gid_tp1.astype(np.int64)
bc = np.bincount(combined, minlength=(Gt + 1) * (Gp + 1))
C[:, :] = bc.reshape((Gt + 1, Gp + 1))
row_sum = C.sum(axis=1) # voxels per grain at t
col_sum = C.sum(axis=0) # voxels per grain at t+1
# Extract top overlaps ignoring 0 row/col
pairs = []
for i in range(1, Gt + 1):
for j in range(1, Gp + 1):
c = int(C[i, j])
if c > 0:
pairs.append((c, i, j))
pairs.sort(reverse=True, key=lambda x: x[0])
_p("\n=== STEP 2: OVERLAP MATRIX SUMMARY ===")
_p(f"Overlap matrix shape : {C.shape} (row 0 / col 0 unused)")
_p(f"Total overlapping voxels : {int(C.sum()):,}")
_p(f"Grains @ t (nonzero rows) : {(row_sum[1:] > 0).sum()} | Max grain ID: {Gt}")
_p(f"Grains @ t+1 (nonzero cols) : {(col_sum[1:] > 0).sum()} | Max grain ID: {Gp}")
_p(f"\n Overlaps (count, g_t -> g_t+1, frac_of_t, frac_of_t+1):")
for c, i, j in pairs:
frac_t = c / row_sum[i] if row_sum[i] else 0.0
frac_p = c / col_sum[j] if col_sum[j] else 0.0
_p(f" {c:4d} {i:2d} → {j:2d} frac_t={frac_t:6.3f} frac_tp1={frac_p:6.3f}")
_p("\nBest match per grain in t:")
for i in range(1, len(row_sum)):
if row_sum[i] == 0:
continue
j_best = int(np.argmax(C[i, :]))
c_best = int(C[i, j_best])
frac_t = c_best / row_sum[i]
frac_p = c_best / col_sum[j_best] if col_sum[j_best] else 0.0
_p(f" g_t={i:2d}: best g_t+1={j_best:2d} count={c_best:4d} frac_t={frac_t:6.3f} frac_p={frac_p:6.3f}")
return C, row_sum, col_sum, pairs
[docs]
def step3_track_from_overlap(C, row_sum, col_sum, *,
tau_parent=0.30, # child must come >=30% from a parent to count as parent
tau_child=0.30, # parent must send >=30% of its mass to child to count as child
allow_multi_parent=True,
verbose: bool = True,
log_fh=None,):
"""
Build tracking relations from overlap matrix.
Returns
-------
parents_of_child : dict[int, list[tuple[int, float, float, int]]]
child j -> list of (parent i, frac_of_parent, frac_of_child, count)
best_parent : dict[int, int]
child j -> parent i with max overlap
best_child : dict[int, int]
parent i -> child j with max overlap
events : dict[str, list]
merge/split/birth/death/continue records
"""
def _p(msg: str):
if not verbose:
return
if log_fh is not None:
print(msg, file=log_fh, flush=True)
else:
print(msg)
Gt = C.shape[0] - 1
Gp1 = C.shape[1] - 1
# Candidate parent links per child
parents_of_child = defaultdict(list)
children_of_parent = defaultdict(list)
for i in range(1, Gt + 1):
if row_sum[i] == 0:
continue
for j in range(1, Gp1 + 1):
c = int(C[i, j])
if c == 0:
continue
f_parent = c / row_sum[i]
f_child = c / col_sum[j] if col_sum[j] else 0.0
# Keep links that are meaningful from either side
if (f_parent >= tau_child) or (f_child >= tau_parent):
parents_of_child[j].append((i, f_parent, f_child, c))
children_of_parent[i].append((j, f_parent, f_child, c))
# Best matches by raw overlap
best_parent = {}
for j in range(1, Gp1 + 1):
i_best = int(np.argmax(C[:, j]))
best_parent[j] = i_best if i_best != 0 else None
best_child = {}
for i in range(1, Gt + 1):
j_best = int(np.argmax(C[i, :]))
best_child[i] = j_best if j_best != 0 else None
# Event classification
events = {"continue": [], "merge": [], "split": [], "birth": [], "death": []}
# Births: child has no meaningful parent links
for j in range(1, Gp1 + 1):
if col_sum[j] == 0:
continue
if j not in parents_of_child:
events["birth"].append(j)
# Deaths: parent has no meaningful child links
for i in range(1, Gt + 1):
if row_sum[i] == 0:
continue
if i not in children_of_parent:
events["death"].append(i)
# Continuations + merges (child perspective)
for j, plist in parents_of_child.items():
# sort by overlap count descending
plist = sorted(plist, key=lambda x: x[3], reverse=True)
if len(plist) == 1:
i = plist[0][0]
# mutual best → strong continuation candidate
if best_child.get(i, None) == j and best_parent.get(j, None) == i:
events["continue"].append((i, j, plist[0]))
else:
# multiple parents → merge
if allow_multi_parent:
parents = [p[0] for p in plist]
events["merge"].append((parents, j, plist))
# Splits (parent perspective): one parent has multiple strong children
for i, clist in children_of_parent.items():
# keep only strong-from-parent links
strong = [x for x in clist if x[1] >= tau_child]
if len(strong) >= 2:
strong = sorted(strong, key=lambda x: x[3], reverse=True)
children = [x[0] for x in strong]
events["split"].append((i, children, strong))
_p("\n=== STEP 3: TRACKING EVENT SUMMARY ===")
_p(f"Continues : {len(events['continue'])}")
_p(f"Merges : {len(events['merge'])}")
_p(f"Splits : {len(events['split'])}")
_p(f"Births : {len(events['birth'])}")
_p(f"Deaths : {len(events['death'])}")
return parents_of_child, best_parent, best_child, events
[docs]
def step4_assign_tracked_ids(
pairs, C, row_sum, col_sum,
events,
parents_of_child,
verbose: bool = True,
log_fh=None,):
"""
Step 4: Assign tracked grain IDs at t+1 using Hungarian matching and event logic.
Parameters
----------
pairs : list of tuples
Each tuple is (count, grain_t, grain_t+1), sorted by overlap.
C : np.ndarray
Overlap matrix where C[i, j] is the count of voxels shared between grain i at t and grain j at t+1.
row_sum : np.ndarray
Total voxel count per grain at t.
col_sum : np.ndarray
Total voxel count per grain at t+1.
events : dict[str, list]
Event records from tracking: contains 'split', 'merge', 'birth', etc.
parents_of_child : dict[int, list[tuple]]
Mapping from child grain ID at t+1 to list of parent grain contributions from t.
verbose : bool, optional
Whether to print detailed summary and logs (default: True).
Returns
-------
final_ids : dict[int, int]
Mapping of grain_id at t+1 → tracked global grain ID.
"""
def _p(msg: str):
if not verbose:
return
if log_fh is not None:
print(msg, file=log_fh, flush=True)
else:
print(msg)
# Step 1: extract unique grain IDs
grains_t = sorted(set(i for _, i, _ in pairs))
grains_tp1 = sorted(set(j for _, _, j in pairs))
id_to_row = {gid: idx for idx, gid in enumerate(grains_t)}
id_to_col = {gid: idx for idx, gid in enumerate(grains_tp1)}
# Step 2: build cost matrix for Hungarian matching
M = np.full((len(grains_t), len(grains_tp1)), fill_value=10.0)
for c, i, j in pairs:
M[id_to_row[i], id_to_col[j]] = -c # maximize overlap
row_idx, col_idx = linear_sum_assignment(M)
# Step 3: extract assigned pairs
assigned_pairs = []
for r, c in zip(row_idx, col_idx):
i = grains_t[r]
j = grains_tp1[c]
cost = M[r, c]
if cost < 1.0:
assigned_pairs.append((i, j, cost))
_p("\n=== STEP 4: Hungarian Matches ===")
for i, j, cost in assigned_pairs:
_p(f" {i}@t → {j}@t+1 | cost={cost:.3f}")
final_ids = {}
assigned_ids = set()
max_gid_t = max(grains_t)
next_id = max_gid_t + 1
# 1. Assign IDs from Hungarian matches
for i, j, _ in assigned_pairs:
if j not in final_ids and i not in assigned_ids:
final_ids[j] = i
assigned_ids.add(i)
# 2. Splits
for i, children, info in events.get("split", []):
children = sorted(children, key=lambda j: C[i, j], reverse=True)
for idx, j in enumerate(children):
if j in final_ids:
continue
if i not in assigned_ids:
final_ids[j] = i
assigned_ids.add(i)
else:
final_ids[j] = next_id
assigned_ids.add(next_id)
next_id += 1
# 3. Merges
for parents, j, info in events.get("merge", []):
if j in final_ids:
continue
dominant_i = max(info, key=lambda x: x[3])[0]
if dominant_i not in assigned_ids:
final_ids[j] = dominant_i
assigned_ids.add(dominant_i)
else:
final_ids[j] = next_id
assigned_ids.add(next_id)
next_id += 1
# 4. Births
for j in events.get("birth", []):
if j not in final_ids:
final_ids[j] = next_id
assigned_ids.add(next_id)
next_id += 1
# 5. Fallback
for j in grains_tp1:
if j not in final_ids:
final_ids[j] = next_id
assigned_ids.add(next_id)
next_id += 1
# === Summary Output ===
_p("\n=== Final Tracked IDs at t+1 ===")
for j in sorted(final_ids):
_p(f" Grain {j}@t+1 → Tracked ID {final_ids[j]}")
_p("\n=== Parent Summary ===")
for j in sorted(grains_tp1):
parents = parents_of_child.get(j, [])
tracked_id = final_ids[j]
if not parents:
status = "birth"
parent_str = "-"
elif len(parents) == 1:
status = "1 parent"
parent_str = f"{parents[0][0]}"
else:
status = f"{len(parents)} parents (merge)"
parent_str = ", ".join(str(p[0]) for p in parents)
_p(f" Grain {j}@t+1 → ID {tracked_id}, {status}, from parent(s): {parent_str}")
# Duplicates
duplicates = [k for k, v in Counter(final_ids.values()).items() if v > 1]
if duplicates:
_p(f"\n⚠️ Duplicated tracked IDs at t+1: {duplicates}")
else:
_p("\n✅ No duplicated tracked IDs at t+1")
return final_ids
[docs]
def step5_update_tracked_ids_in_json(
json_path, t,
final_ids,
parents_of_child,
verbose: bool = True,
log_fh=None,):
"""
Step 5: Update JSON file with new tracked grain IDs and parent relationships.
Parameters
----------
json_path : str
Path to the microstructure JSON file.
t : int
Snapshot index to update (this modifies snapshot at t+1).
final_ids : dict[int, int]
Mapping of grain_id at t+1 to globally tracked IDs.
parents_of_child : dict[int, list[tuple]]
Mapping from child grain ID to parent grain info (from t).
"""
def _p(msg: str):
if not verbose:
return
if log_fh is not None:
print(msg, file=log_fh, flush=True)
else:
print(msg)
with open(json_path, 'r') as f:
data = json.load(f)
micro = data["microstructure"]
snapshot = micro[t + 1] # we are updating t+1
# --- Update grains ---
updated_grains = []
for grain in snapshot["grains"]:
old_gid = grain["grain_id"]
if old_gid not in final_ids:
continue
new_gid = final_ids[old_gid]
new_grain = grain.copy()
new_grain["grain_id"] = new_gid
parents = parents_of_child.get(old_gid, [])
if len(parents) == 1:
new_grain["parent_grain_id"] = parents[0][0]
elif len(parents) > 1:
sorted_parents = sorted(parents, key=lambda x: x[3], reverse=True)
new_grain["parent_grain_id"] = [p[0] for p in sorted_parents]
updated_grains.append(new_grain)
snapshot["grains"] = updated_grains
# --- Update voxels ---
for voxel in snapshot["voxels"]:
old_gid = voxel["grain_id"]
if old_gid in final_ids:
voxel["grain_id"] = final_ids[old_gid]
# --- Save back to file ---
with open(json_path, 'w') as f:
json.dump(data, f, indent=2)
_p(f"✅ Snapshot {t+1} successfully updated and saved to: {json_path}")
[docs]
def track_all_grains_across_snapshots(
json_path: str,
*,
log_path: str | None = None,
progress: bool = False,
verbose: bool = True,) -> None:
"""
Run the full grain tracking pipeline across all snapshot pairs in the microstructure JSON.
Parameters
----------
json_path : str
Path to the input JSON file containing microstructure data.
"""
if progress:
try:
from tqdm.auto import tqdm
except Exception as e:
raise ImportError(
"progress=True requires tqdm. Install it via `pip install tqdm` or disable progress."
) from e
log_fh = None
if log_path is not None:
log_fh = open(log_path, "a", encoding="utf-8")
def _p(*args):
if not verbose:
return
if log_fh is not None:
print(*args, file=log_fh, flush=True)
else:
print(*args)
try:
with open(json_path, 'r') as f:
data = json.load(f)
num_snapshots = len(data["microstructure"])
_p(f"📦 Starting grain tracking on {num_snapshots} snapshots...")
# progress bar over snapshot-pairs (t -> t+1)
it = range(num_snapshots - 1)
if progress:
it = tqdm(it, total=max(num_snapshots - 1, 0), desc="Tracking grain IDs", leave=True)
for t in it:
_p(f"\n\n🚩 Processing snapshot pair: {t} → {t+1}")
# Step 1: Extract aligned GID arrays
gid_t, gid_tp1, idx_list = step1_extract_gid_arrays(json_path, t, verbose=verbose, log_fh=log_fh)
# Step 2: Compute overlap matrix
C, row_sum, col_sum, pairs = step2_overlap_counts(gid_t, gid_tp1, verbose=verbose, log_fh=log_fh)
# Step 3: Track from overlap
parents_of_child, best_parent, best_child, events = step3_track_from_overlap(C, row_sum, col_sum,
verbose=verbose, log_fh=log_fh)
# Step 4: Assign tracked IDs
final_ids = step4_assign_tracked_ids(pairs, C, row_sum, col_sum, events, parents_of_child,
verbose=verbose, log_fh=log_fh)
# Step 5: Update JSON file in-place for snapshot t+1
step5_update_tracked_ids_in_json(json_path, t, final_ids, parents_of_child, verbose=verbose, log_fh=log_fh)
# optional live postfix info on progress bar (no stdout spam)
if progress:
try:
n_grains_t = int(gid_t.max()) if gid_t.size else 0
n_grains_tp1 = int(gid_tp1.max()) if gid_tp1.size else 0
it.set_postfix(
grains_t=n_grains_t,
grains_tp1=n_grains_tp1,
merges=len(events.get("merge", [])),
splits=len(events.get("split", [])),
refresh=True,
)
except Exception:
pass
_p("\n✅ Grain tracking and ID propagation completed across all snapshots.")
finally:
if log_fh is not None:
log_fh.close()
[docs]
def add_ipf_color(
json_path: str | Path,
l: Sequence[float] = (1, 0, 0),
*,
out_path: str | Path | None = None,
) -> Path:
"""
Add per-voxel IPF RGB colors to every snapshot in the JSON.
- Reads symmetry from: data["phases"][0]["orientation"]["crystal_symmetry_group"]
- Computes IPF color along lab direction `l` (default (1,0,0))
- Writes into each voxel:
"IPFcolor_(l0 l1 l2)": [R, G, B] (8-bit)
Assumptions:
- Single phase (data["phases"] has exactly one element).
- Voxel orientation stored as Euler angles [phi1, Phi, phi2] in radians.
"""
from orix import plot
from orix.quaternion import Orientation, symmetry
from orix.vector import Vector3d
json_path = Path(json_path)
out_path = Path(out_path) if out_path is not None else json_path
# --- Load JSON ---
data: dict[str, Any] = json.loads(json_path.read_text(encoding="utf-8"))
# --- Extract symmetry (your JSON stores phases as a list) ---
phases = data["phases"]
if not isinstance(phases, list) or len(phases) != 1:
raise ValueError(f'Expected one phase in data["phases"], got {type(phases)} with len={len(phases) if isinstance(phases, list) else "?"}.')
sym_str = phases[0]["orientation"]["crystal_symmetry_group"]
# Minimal mapping (extend later if needed)
if sym_str != "m-3m":
raise ValueError(f'Unsupported symmetry "{sym_str}" in this simplified function. Expected "m-3m".')
laue_sym = symmetry.Oh # cubic m-3m
# --- Build IPF color key for direction `l` ---
l = np.asarray(l, dtype=float)
if l.shape != (3,) or np.allclose(l, 0.0):
raise ValueError("`l` must be a non-zero length-3 vector, e.g. (1,0,0).")
ckey = plot.IPFColorKeyTSL(symmetry=laue_sym, direction=Vector3d(l))
# Store using your requested label style
ipf_key = f"IPFcolor_({int(l[0])} {int(l[1])} {int(l[2])})"
# --- Process snapshots (vectorized per snapshot) ---
for snap in data["microstructure"]:
voxels = snap.get("voxels", [])
if not voxels:
continue
# Collect voxel Euler angles into (N,3)
eulers = np.asarray([v["orientation"] for v in voxels], dtype=float)
if eulers.ndim != 2 or eulers.shape[1] != 3:
raise ValueError("Each voxel['orientation'] must be a length-3 Euler list.")
# Euler -> Orientation (radians)
O = Orientation.from_euler(eulers, symmetry=laue_sym, degrees=False)
# Orientation -> RGB floats in [0,1] -> uint8 [0..255]
rgb = ckey.orientation2color(O)
rgb_u8 = np.clip(np.rint(rgb * 255.0), 0, 255).astype(np.uint8)
# Write back to each voxel
for v, c in zip(voxels, rgb_u8):
v[ipf_key] = c.tolist()
# --- Save JSON ---
out_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
return out_path