Source code for snudda.place.place

# snudda_place.py
#
# Johannes Hjorth, Royal Institute of Technology (KTH)
# Human Brain Project 2019
#
# This open source software code was developed in part or in whole in
# the Human Brain Project, funded from the European Union’s Horizon
# 2020 Framework Programme for Research and Innovation under Specific
# Grant Agreements No. 720270 and No. 785907, No 945539
# (Human Brain Project SGA1, SGA2, SGA3).

# TODO: Let place also decide where separate axonal morphologies should be located, save swc morph, and position + rotation
#       Then detect can just load that info directly. Simplifies load_neuron
#       HDF5 filen, ha en matrix med info för axon morph, axon pos, axon rotation, axon parent

# TODO: We need to define axon_density in meta.json file for neurons that need the axon density
#       Then we need to make sure the code reads this axon density if it exists in meta.json file


import json
import os
import sys

import h5py
import numexpr
import numpy as np
import scipy.cluster
from scipy.interpolate import griddata

from snudda.utils.snudda_path import get_snudda_data
from snudda.neurons.neuron_prototype import NeuronPrototype
# from snudda.place.region_mesh_redux import NeuronPlacer, RegionMeshRedux
from snudda.place.region_mesh_vedo import NeuronPlacer, RegionMesh


from snudda.place.rotation import SnuddaRotate
from snudda.utils.snudda_path import snudda_parse_path, snudda_path_exists, snudda_simplify_path

from snudda.neurons.morphology_data import MorphologyData

''' This code places all neurons in space, but does not setup their
    connectivity. That is done by detect.py and prune.py '''


[docs] class SnuddaPlace(object): """ Places neurons in 3D space. Use detect to add connections, and prune to remove redundant connections. """ def __init__(self, config_file=None, network_path=None, snudda_data=None, verbose=False, log_file=None, rc=None, d_view=None, h5libver=None, random_seed=None, griddata_interpolation=False, morphologies_stay_inside=True): """ Constructor. Args: config_file (str) : Path to config file, e.g. network-config.json in network_path network_path (str) : Path to network directory snudda_data (str): Path to snudda data verbose (bool) : Print extra information on screen log_file (str) : Log file for place rc : ipyparallel remote client d_view : ipyparallel direct view object h5libver : Version of h5py library random_seed (int) : Numpy random seed griddata_interpolation (bool) : Should we interpolate density data (5x slower) """ self.rc = rc self.d_view = d_view if self.rc and not self.d_view: self.d_view = self.rc.direct_view(targets='all') if not config_file and network_path: config_file = os.path.join(network_path, "network-config.json") if not network_path and config_file: network_path = os.path.dirname(config_file) if not log_file and network_path: log_dir = os.path.join(network_path, "log") os.makedirs(log_dir, exist_ok=True) log_file = open(os.path.join(log_dir, "place-neurons.txt"), "w") self.network_path = network_path self.config_file = config_file self.config = None self.morphologies_stay_inside = morphologies_stay_inside self.axon_config_cache = None self.snudda_data = get_snudda_data(snudda_data=snudda_data, config_file=self.config_file, network_path=self.network_path) self.verbose = verbose self.log_file = log_file if self.snudda_data is None: self.write_log(f"Warning, snudda_data is not set!") if self.network_path: self.position_file = os.path.join(self.network_path, "network-neuron-positions.hdf5") else: self.write_log("No network_path given, not setting position_file. Remember to pass it to write_data.") self.position_file = None if h5libver is None: self.h5libver = "latest" else: self.h5libver = h5libver self.write_log(f"Using hdf5 version: {self.h5libver}") self.griddata_interpolation = griddata_interpolation # List of all neurons self.neurons = [] self.neuron_prototypes = {} self.random_seed = random_seed self.random_generator = None self.rotate_helper = None # This defines the neuron units/channels. The dictionary lists all the # members of each unit, the neuronChannel gives the individual neurons # channel membership self.num_population_units = 1 self.population_unit_placement_method = "random" self.population_units = dict([]) self.population_unit = None # These are the dimensions of our space, d_min also needs a "padding" # region outside the space where fake neurons are placed. This is to # avoid boundary effects, without padding we would get too high density # at the edges self.volume = dict([]) def __del__(self): if self.rc: # Cleanup memory on workers from snudda.utils import cleanup cleanup(self.rc, "place") ############################################################################
[docs] def place(self): """ Place neurons in 3D space. """ self.parse_config() if self.morphologies_stay_inside: self.avoid_edges_parallel() self.write_data()
############################################################################
[docs] def write_log(self, text, flush=True, is_error=False, force_print=False): # Change flush to False in future, debug """ Writes to log file. Use setup_log first. Text is only written to screen if self.verbose=True, or is_error = True, or force_print = True. test (str) : Text to write flush (bool) : Should all writes be flushed to disk directly? is_error (bool) : Is this an error, always written. force_print (bool) : Force printing, even if self.verbose=False. """ if self.log_file is not None: self.log_file.write(f"{text}\n") if flush: self.log_file.flush() if self.verbose or is_error or force_print: print(text, flush=True)
def close_log_file(self): if not isinstance(self.log_file, str) and self.log_file: self.log_file.close() self.log_file = None ############################################################################
[docs] def add_neurons(self, swc_path, num_neurons, param_filename=None, mech_filename=None, modulation=None, reaction_diffusion=None, name="Unnamed", hoc=None, volume_id=None, virtual_neuron=False, axon_density=None, parameter_key=None, morphology_key=None, modulation_key=None, config=None, rng=None): """ Add neurons to volume specified. Args: swc_path (str): Path to morphology directory (or single morphology) num_neurons (int): Number of neurons to add param_filename (str): Path to parameter file mech_filename (str): Path to mechanism file modulation (str): Path to neuromodulation file reaction_diffusion (str): Path to RxD reaction diffusion file name (str): Name of neuron population, e.g. DSPN (which will become DSPN_0, DSPN_1, etc...) hoc (str): Path to hoc file (currently disabled) volume_id (str): ID of the volume to place neurons in virtual_neuron (bool): Real or virtual neuron, the latter can be used to model axons giving input to network axon_density (str): Axon density parameter_key (str, optional): Parameter Key to use, default=None (Randomise between all available) morphology_key (str, optional): Morphology Key to use, default=None (Randomise between all available) modulation_key (str, optional): Key for neuromodulation, default=None (Randomise between all available) config (dict): dict representation of network-config.json """ assert volume_id is not None, f"You must specify a volume for neuron {name}" assert hoc is None, "Currently only support hoc=None, since we can have multiple parameter, morph combos now" if rng is None: rng = self.random_generator neuron_prototype = NeuronPrototype(neuron_name=name, neuron_path=None, snudda_data=self.snudda_data, morphology_path=swc_path, parameter_path=param_filename, mechanism_path=mech_filename, modulation_path=modulation, reaction_diffusion_path=reaction_diffusion, load_morphology=False, virtual_neuron=virtual_neuron, verbose=self.verbose) neuron_type = name.split("_")[0] neuron_positions = self.volume[volume_id]["mesh"].place_neurons(num_neurons, neuron_type) first_added = True neuron_rotations = self.rotate_helper.get_rotations(volume_name=volume_id, neuron_type=neuron_type, neuron_positions=neuron_positions, rng=rng) # Are there any projections from this neuron type in the config file? axon_info = self.generate_extra_axon_info(source_neuron=name, position=neuron_positions, config=config, rng=rng) for idx, (coords, rotation) in enumerate(zip(neuron_positions, neuron_rotations)): # We set loadMorphology = False, to preserve memory # Only morphology loaded for nm then, to get axon and dend # radius needed for connectivity # Pick a random parameter set # parameter.json can be a list of lists, this allows you to select the # parameter set randomly # modulation.json is similarly formatted, pick a parameter set here if parameter_key is None: parameter_id = rng.integers(1000000) else: parameter_id = None if modulation_key is None: modulation_id = rng.integers(1000000) else: modulation_id = None if morphology_key is None: morphology_id = rng.integers(1000000) else: morphology_id = None n = neuron_prototype.clone(position=coords, rotation=rotation, morphology_id=morphology_id, parameter_id=parameter_id, modulation_id=modulation_id, parameter_key=parameter_key, morphology_key=morphology_key, modulation_key=modulation_key) n.neuron_id = len(self.neurons) n.volume_id = volume_id if axon_density: n.axon_density_type = axon_density[0] n.axon_density = axon_density[1] n.max_axon_radius = axon_density[2] n.axon_density_bounds_xyz = axon_density[2] if axon_info is not None and len(axon_info) > 0: for axon_name, axon_position, axon_rotation, axon_swc in axon_info: n.add_morphology(swc_file=axon_swc[idx], name=axon_name, position=axon_position[idx, :], rotation=axon_rotation[idx, :].reshape((3, 3)), lazy_loading=True) self.neurons.append(n) # This info is used by workers to speed things up if first_added: first_added = False self.neuron_prototypes[n.name] = n
############################################################################
[docs] def parse_config(self, config_file=None, resort_neurons=False): """ Pase network config_file""" if config_file is None: config_file = self.config_file if config_file is None: self.write_log("No config file specified.", is_error=True) raise ValueError("No config file specified.") if not os.path.isfile(config_file): self.write_log(f"Missing config file {config_file}", is_error=True) raise ValueError(f"Missing config file {config_file}") self.write_log(f"Parsing place config file {config_file}") with open(config_file, "r") as f: config = json.load(f) if not config: self.write_log("Warning, empty network config.") if self.random_seed is None: if "random_seed" in config and "place" in config["random_seed"]: self.random_seed = config["random_seed"]["place"] self.write_log(f"Reading random seed from config file: {self.random_seed}") else: self.write_log(f"Using random seed provided by command line: {self.random_seed}") # Setup for rotations self.rotate_helper = SnuddaRotate(self.config_file) if self.random_seed: self.random_generator = np.random.default_rng(self.random_seed + 115) else: self.random_generator = np.random.default_rng() # Setup random seeds for all regions ss = np.random.SeedSequence(self.random_seed) all_seeds = ss.generate_state(len(config["regions"])) for (region_name, region_data), region_seed in zip(config["regions"].items(), all_seeds): total_num_neurons = region_data.get("num_neurons") volume_data = region_data["volume"] self.volume[region_name] = volume_data if "random_seed" not in volume_data: self.volume[region_name]["random_seed"] = region_seed else: region_seed = self.volume[region_name]["random_seed"] region_rnd = np.random.default_rng(region_seed + 123) if snudda_path_exists(volume_data["mesh_file"], self.snudda_data): mesh_file = snudda_parse_path(volume_data["mesh_file"], self.snudda_data) elif os.path.isfile(os.path.join(self.network_path, volume_data["mesh_file"])): mesh_file = os.path.join(self.network_path, volume_data["mesh_file"]) else: raise ValueError(f"Unable to find mesh_file {volume_data['mesh_file']}") if "num_putative_points" in volume_data: num_putative_points = int(volume_data["num_putative_points"]) else: num_putative_points = None self.volume[region_name]["mesh"] \ = NeuronPlacer(mesh_path=mesh_file, d_min=self.volume[region_name]["d_min"], random_seed=region_seed, n_putative_points=num_putative_points) if "density" in self.volume[region_name]: for neuron_type in self.volume[region_name]["density"]: density_func = None if "density_function" in self.volume[region_name]["density"][neuron_type]: density_str = self.volume[region_name]["density"][neuron_type]["density_function"] density_func = lambda x, y, z, d_str=density_str: numexpr.evaluate(d_str) if "density_file" in self.volume[region_name]["density"][neuron_type]: density_file = self.volume[region_name]["density"][neuron_type]["density_file"] # We need to load the data from the file from scipy.interpolate import griddata with open(snudda_parse_path(density_file, self.snudda_data), "r") as f: density_data = json.load(f) assert region_name in density_data and neuron_type in density_data[region_name], \ f"Volume {region_name} does not contain data for neuron type {neuron_type}" assert "coordinates" in density_data[region_name][neuron_type] \ and "density" in density_data[region_name][neuron_type], \ (f"Missing coordinates and/or Density data for " f"volume {region_name}, neuron type {neuron_type}") # Convert to SI (* 1e-6) coord = np.array(density_data[region_name][neuron_type]["coordinates"]) * 1e-6 density = np.array(density_data[region_name][neuron_type]["density"]) if self.griddata_interpolation: density_func = lambda x, y, z, c=coord, d=density: \ griddata(points=c, values=d, xi=np.array([x, y, z]), method="linear", fill_value=0).transpose() else: density_func = lambda x, y, z, c=coord, d=density: \ griddata(points=c, values=d, xi=np.array([x, y, z]), method="nearest", fill_value=0).transpose() self.volume[region_name]["mesh"].define_density(neuron_type, density_func) if "neurons" not in region_data: self.write_log(f"No neurons specified for volume {region_name}") number_of_added_neurons = 0 for neuron_type, neuron_data in region_data["neurons"].items(): model_type = neuron_data.get("neuron_type", "neuron") # default "neuron" # rotation_mode currently not used?! axon_density = neuron_data.get("axon_density") if "num_neurons" in neuron_data: num_neurons = neuron_data["num_neurons"] elif "fraction" in neuron_data: if total_num_neurons is None: raise ValueError(f"If fraction is specified, then total_num_neurons for the region {region_name} must be set") if neuron_data["fraction"] < 0 or neuron_data["fraction"] > 1: raise ValueError(f"{neuron_type}: Neuron 'fraction' must be between 0 and 1.") num_neurons = int(neuron_data["fraction"] * total_num_neurons) else: raise ValueError(f"You need to specify 'fraction' or 'num_neurons' for {neuron_type}") # print(f"{neuron_type = }, {num_neurons = }") n_types = len(neuron_data["neuron_path"]) if isinstance(num_neurons, int): n_neurons = np.full((n_types, ), int(num_neurons/n_types)) extra_n = region_rnd.integers(low=0, high=n_types, size=num_neurons-np.sum(n_neurons)) for en in extra_n: n_neurons[en] += 1 elif len(num_neurons) != n_types: raise ValueError(f"num_neurons can be a scalar or a vector, if it is a vector " f"({num_neurons = }) then its length must equal number of " f"neuron_path:s given ({n_types})") else: n_neurons = num_neurons # RxD reaction diffusion config file default_reaction_diffusion = neuron_data.get("reaction_diffusion") parameter_key_list = SnuddaPlace.replicate_str(neuron_data.get("parameter_key"), n_neurons, f"{neuron_type} parameter_key") morphology_key_list = SnuddaPlace.replicate_str(neuron_data.get("morphology_key"), n_neurons, f"{neuron_type} morphology_key") modulation_key_list = SnuddaPlace.replicate_str(neuron_data.get("modulation_key"), n_neurons, f"{neuron_type} modulation_key") for (neuron_name, neuron_path), num, parameter_key, morphology_key, modulation_key \ in zip(neuron_data["neuron_path"].items(), n_neurons, parameter_key_list, morphology_key_list, modulation_key_list): print(f"{neuron_name = }, {num = }, {neuron_path = }") if neuron_name.split("_")[0] != neuron_type and neuron_name != neuron_type: raise ValueError(f"The keys in neuron_path must be {neuron_name}_X where X is usually a number") morph = os.path.join(neuron_path, "morphology") param = os.path.join(neuron_path, "parameters.json") mech = os.path.join(neuron_path, "mechanisms.json") modulation = os.path.join(neuron_path, "modulation.json") if not snudda_path_exists(modulation, snudda_data=self.snudda_data): modulation = None if default_reaction_diffusion is None: reaction_diffusion = os.path.join(neuron_path, "reaction_diffusion.json") if not snudda_path_exists(reaction_diffusion, snudda_data=self.snudda_data): reaction_diffusion = None else: reaction_diffusion = default_reaction_diffusion if model_type == "virtual": param = None mech = None modulation = None hoc = None virtual_neuron = True else: virtual_neuron = False self.add_neurons(name=neuron_name, swc_path=morph, param_filename=param, mech_filename=mech, modulation=modulation, reaction_diffusion=reaction_diffusion, num_neurons=num, hoc=None, volume_id=region_name, virtual_neuron=virtual_neuron, axon_density=axon_density, parameter_key=parameter_key, morphology_key=morphology_key, modulation_key=modulation_key, config=config, rng=region_rnd) number_of_added_neurons += num if total_num_neurons is not None and number_of_added_neurons > total_num_neurons*1.01: raise ValueError(f"{region_name} should have {total_num_neurons} but {number_of_added_neurons} were added.") self.config_file = config_file self.config = config # We reorder neurons, sorting their IDs after position # -- UPDATE: Now we spatial cluster neurons depending on number of workers if resort_neurons: self.sort_neurons(sort_idx=self.cluster_neurons(rng=region_rnd)) if False: # Debug purposes, make sure neuron ranges are ok self.plot_ranges() self.define_population_units(config)
@staticmethod def get_var_helper(data, key, default_value=None): return data[key] if key in data else default_value ############################################################################ def avoid_edges_parallel(self): ss = np.random.SeedSequence(self.random_seed + 100) neuron_random_seed = ss.generate_state(len(self.neurons)) bend_neuron_info = [] for neuron in self.neurons: neuron_type = neuron.name.split("_")[0] neuron_config = self.config["regions"][neuron.volume_id]["neurons"][neuron_type] if "stay_inside_mesh" in neuron_config and neuron_config["stay_inside_mesh"]: volume_id = neuron_config["volume_id"] mesh_file = self.config["regions"][volume_id]["volume"]["mesh_file"] if isinstance(neuron_config["stay_inside_mesh"], dict): if "k_dist" in neuron_config["stay_inside_mesh"]: k_dist = neuron_config["stay_inside_mesh"]["k_dist"] else: k_dist = 30e-6 if "n_random" in neuron_config["stay_inside_mesh"]: n_random = neuron_config["stay_inside_mesh"]["n_random"] else: n_random = 5 if "max_angle" in neuron_config["stay_inside_mesh"]: max_angle = neuron_config["stay_inside_mesh"]["max_angle"] else: max_angle = 0.1 # radians else: k_dist = 30e-6 n_random = 5 max_angle = 0.1 # radians bend_neuron_info.append((neuron.neuron_id, neuron.name, neuron.swc_filename, neuron.position, neuron.rotation, neuron_random_seed[neuron.neuron_id], volume_id, mesh_file, k_dist, n_random, max_angle)) bend_morph_path = os.path.join(self.network_path, "modified_morphologies") if not os.path.isdir(bend_morph_path): os.mkdir(bend_morph_path) if self.d_view is None: # Make sure we use the same random seeds if we run in serial, as would have been used in parallel modified_neurons = self.avoid_edges_helper(bend_neuron_info=bend_neuron_info, network_path=self.network_path) else: # Make random permutation of neurons, to spread out the edge neurons unsorted_neuron_id = self.random_generator.permutation(len(bend_neuron_info)) bend_neuron_info = [bend_neuron_info[idx] for idx in unsorted_neuron_id] with self.d_view.sync_imports(): from snudda.place import SnuddaPlace self.d_view.scatter("bend_neuron_info", bend_neuron_info, block=True) self.d_view.push({"config_file": self.config_file, "network_path": self.network_path, "snudda_data": self.snudda_data}, block=True) cmd_str = f"sp = SnuddaPlace(config_file=config_file,network_path=network_path,snudda_data=snudda_data)" self.d_view.execute(cmd_str, block=True) cmd_str3 = f"modified_neurons = SnuddaPlace.avoid_edges_helper(bend_neuron_info=bend_neuron_info, network_path=network_path)" self.d_view.execute(cmd_str3, block=True) modified_neurons = self.d_view.gather("modified_neurons", block=True) for neuron_id, new_morphology in modified_neurons: # Replace the original morphology with the warped morphology, morphology includes rotation self.neurons[neuron_id].swc_filename = new_morphology self.neurons[neuron_id].rotation = np.eye(3) @staticmethod def avoid_edges_helper(bend_neuron_info, network_path): # TODO: We need name, swc_file, position, rotation # This needs to be passed, since self.neurons is not pickleable... try: from snudda.place.bend_morphologies import BendMorphologies bend_morph = dict() bend_morph_path = os.path.join(network_path, "modified_morphologies") modified_morphologies = [] for neuron_id, neuron_name, swc_filename, position, rotation, random_seed, volume_id, mesh_file,\ k_dist, n_random, max_angle in bend_neuron_info: if volume_id not in bend_morph: bend_morph[volume_id] = BendMorphologies(region_mesh=mesh_file, rng=None) # Returns None if unchanged new_morph_name = os.path.join(bend_morph_path, f"{neuron_name}-{neuron_id}.swc") new_morphology = bend_morph[volume_id].edge_avoiding_morphology(swc_file=swc_filename, new_file=new_morph_name, original_position=position, original_rotation=rotation, random_seed=random_seed, k_dist=k_dist, n_random=n_random, max_angle=max_angle) if new_morphology: modified_morphologies.append((neuron_id, new_morphology)) except Exception as e: # If the code fails in parallel, let's just write a error file with some info import traceback import uuid file_name = f"error_{uuid.uuid4().hex}.txt" error_str = (f"Error in avoid_edges_helper:\n {bend_neuron_info =}, {network_path =}" f"Exception:\n {e}\n\nTraceback:\n{traceback.format_exc()}") print(error_str) print(f"Writing detailed error to {file_name}") # Write the error message to the file with open(file_name, "w") as file: file.write(error_str) for var in ["neuron_id", "swc_filename", "neuron_name", "new_morph_name", "random_seed", "position", "rotation", "k_dist", "n_random", "max_angle", "modified_morphologies"]: if var in locals(): value = locals()[var] file.write(f"\n {var} = {value}\n") else: file.write(f"\n '{var}' not defined.") raise e return modified_morphologies ############################################################################ def generate_extra_axon_info(self, source_neuron, position, config, rng): axon_info = [] if self.axon_config_cache is None: self.axon_config_cache = dict() for region in config["regions"]: for neuron_key, neuron_info in config["regions"][region]["neurons"].items(): if "axon_config" in neuron_info: axon_config = snudda_parse_path(neuron_info["axon_config"], self.snudda_data) with open(axon_config, "r") as f: self.axon_config_cache[neuron_key] = json.load(f) # Each neuron type has a set of axons to choose from source_neuron_type = source_neuron.split("_")[0] if source_neuron_type in self.axon_config_cache: axon_config = self.axon_config_cache[source_neuron_type] for axon_name, axon_data in axon_config.items(): axon_position, axon_rotation, axon_swc \ = self.get_projection_axon_location(source_position=position, proj_info=axon_data, rng=rng) axon_info.append([axon_name, axon_position, axon_rotation, axon_swc]) return axon_info else: # No extra axons for neuron return [] ######################################################################### def generate_extra_axon_info_old(self, source_neuron, position, config, rng): axon_info = [] if self.axon_config_cache is None: self.axon_config_cache = dict() for neuron_key, neuron_info in config["neurons"].items(): if "axon_config" in neuron_info: axon_config = snudda_parse_path(neuron_info["axon_config"], self.snudda_data) with open(axon_config, "r") as f: self.axon_config_cache[neuron_key] = json.load(f) if source_neuron in self.axon_config_cache: axon_config = self.axon_config_cache[source_neuron] for axon_name, axon_data in axon_config.items(): axon_position, axon_rotation, axon_swc \ = self.get_projection_axon_location(source_position=position, proj_info=axon_data, rng=rng) axon_info.append([axon_name, axon_position, axon_rotation, axon_swc]) return axon_info else: # No extra axons for neuron return [] def get_projection_axon_location(self, source_position, proj_info, rng, patch_hull=True): if 'projection' not in proj_info: raise KeyError("No 'projection' entry in the projection config!") proj_cfg = proj_info["projection"] if "projection_file" in proj_cfg and \ ("source" in proj_cfg or \ "destination" in proj_cfg): raise NotImplementedError("Projections should specify either a file or a mapping!") elif "source" in proj_cfg \ and "destination" in proj_cfg: source = np.array(proj_cfg["source"])*1e-6 destination = np.array(proj_cfg["destination"])*1e-6 elif "projection_file" in proj_cfg : with open(proj_cfg["projection_file"], 'r') as f: proj_file_data = json.load(f) source = np.array(proj_file_data["source"])*1e-6 destination = np.array(proj_file_data["destination"])*1e-6 else: raise NotImplementedError("Unknown projection configuration!") # specify the rotations of the termination zones rotation_cfg = proj_info.get("rotation", {}) if "rotation" in rotation_cfg: rotation = np.array(rotation_cfg["rotation"]) rot_position = destination elif "rotation_file" in rotation_cfg: with open(rotation_cfg["rotation_file"], 'r') as f: rotation_data = json.load(f) rotation = np.array(rotation_data["rotation"]) rot_position = np.array(rotation_data["position"])*1e-6 else: rotation = None rot_position = None target_centres = griddata(points=source, values=destination, xi=source_position, method="linear") # this checks if there are values outside of the convex hull to_patch = np.where(np.isnan(np.sum(target_centres, axis=1)))[0] # and patch missing entries if patch_hull and len(to_patch)>0: self.write_log(f"Patched {len(to_patch)}/{len(target_centres)}") target_centres_patched = griddata(points=source, values=destination, xi=source_position, method="nearest") target_centres[to_patch] = target_centres_patched[to_patch] # which coordinates to use for selecting rotation mapping = rotation_cfg.get('mapping', 'target') if mapping == "target": xi = target_centres elif mapping == "source": xi = source_position else: raise NotImplementedError(f"Unknown mapping '{mapping}'!") if rotation is not None: target_rotation = griddata(points=rot_position, values=rotation, xi=xi, method="linear") # if the rotation is specified as a field of rotation vectors, # then these need to be converted to matrices. if target_rotation[0].shape == (3,): rotation_matrices = \ [SnuddaRotate.rotation_matrix_from_vectors(np.array([0, 0, 1]), rv).flatten()\ for rv in target_rotation] target_rotation = np.array(rotation_matrices) else: target_rotation = [None for x in range(source_position.shape[0])] num_axons = len(proj_info["morphologies"]) axon_id = rng.choice(num_axons, source_position.shape[0]) axon_swc = [proj_info["morphologies"][x] for x in axon_id] return target_centres, target_rotation, axon_swc ############################################################################
[docs] def all_neuron_positions(self): """ Returns all neuron positions as a n x 3 matrix. """ n_neurons = len(self.neurons) pos = np.zeros((n_neurons, 3)) for i in range(0, n_neurons): pos[i, :] = self.neurons[i].position return pos
############################################################################
[docs] def all_neuron_rotations(self): """ Returns all neuron rotations as a n x 3 x 3 matrix. """ n_neurons = len(self.neurons) rot = np.zeros((n_neurons, 3, 3)) for i in range(0, n_neurons): rot[i, :, :] = self.neurons[i].rotation return rot
############################################################################
[docs] def all_neuron_names(self): """ Returns all neuron names as a list. """ return map(lambda x: x.name, self.neurons)
############################################################################ def gather_extra_axons(self): ax_neuron = [] ax_name = [] ax_swc = [] ax_position = [] ax_rotation = [] for n_idx, n in enumerate(self.neurons): for md_key, md in n.morphology_data.items(): if md_key != "neuron": ax_neuron.append(n_idx) # which neuron does axon belong to ax_name.append(md_key) ax_swc.append(md.swc_file) ax_position.append(md.position) ax_rotation.append(md.rotation.reshape((1, 9))) if len(ax_neuron) > 0: ax_neuron = np.array(ax_neuron) ax_position = np.vstack(ax_position) ax_rotation = np.vstack(ax_rotation) return ax_neuron, ax_name, ax_position, ax_rotation, ax_swc ############################################################################
[docs] def write_data(self, file_name=None): """ Writes position data to HDF5 file file_name. """ if not file_name: file_name = self.position_file assert len(self.neurons) > 0, "No neurons to save!" self.write_log(f"Writing data to HDF5 file: {file_name}") pos_file = h5py.File(file_name, "w", libver=self.h5libver) with open(self.config_file, 'r') as cfg_file: config = json.load(cfg_file) # Meta data save_meta_data = [(self.config_file, "config_file"), (json.dumps(config), "config"), (snudda_parse_path("$DATA", self.snudda_data), "snudda_data")] meta_group = pos_file.create_group("meta") for data, data_name in save_meta_data: meta_group.create_dataset(data_name, data=data) network_group = pos_file.create_group("network") # Neuron information neuron_group = network_group.create_group("neurons") # If the name list is longer than 20 chars, increase S20 name_list = [n.name.encode("ascii", "ignore") for n in self.neurons] str_type = 'S' + str(max(1, max([len(x) for x in name_list]))) neuron_group.create_dataset("name", (len(name_list),), str_type, name_list, compression="gzip") neuron_id_list = np.arange(len(self.neurons)) neuron_group.create_dataset("neuron_id", (len(neuron_id_list),), 'int', neuron_id_list) volume_id_list = [n.volume_id.encode("ascii", "ignore") for n in self.neurons] str_type_vid = 'S' + str(max(1, max([len(x) for x in volume_id_list]))) neuron_group.create_dataset("volume_id", (len(volume_id_list),), str_type_vid, volume_id_list, compression="gzip") hoc_list = [snudda_simplify_path(n.hoc, self.snudda_data).encode("ascii", "ignore") if hasattr(n, "hoc") else "" for n in self.neurons] max_hoc_len = max([len(x) for x in hoc_list]) max_hoc_len = max(max_hoc_len, 10) # In case there are none neuron_group.create_dataset("hoc", (len(hoc_list),), f"S{max_hoc_len}", hoc_list, compression="gzip") swc_list = [snudda_simplify_path(n.swc_filename, self.snudda_data).encode("ascii", "ignore") for n in self.neurons] max_swc_len = max([len(x) for x in swc_list]) neuron_group.create_dataset("morphology", (len(swc_list),), data=swc_list, dtype=h5py.special_dtype(vlen=bytes), compression="gzip") neuron_path = [snudda_simplify_path(n.neuron_path, self.snudda_data).encode("ascii", "ignore") for n in self.neurons] max_np_len = max([len(x) for x in neuron_path]) # neuron_group.create_dataset("neuron_path", (len(neuron_path),), f"S{max_np_len}", neuron_path) neuron_group.create_dataset("neuron_path", (len(neuron_path),), data=neuron_path, dtype=h5py.special_dtype(vlen=str)) virtual_neuron_list = np.array([n.virtual_neuron for n in self.neurons], dtype=bool) virtual_neuron = neuron_group.create_dataset("virtual_neuron", data=virtual_neuron_list) # Create dataset, filled further down neuron_position = neuron_group.create_dataset("position", (len(self.neurons), 3), "float", compression="gzip") neuron_rotation = neuron_group.create_dataset("rotation", (len(self.neurons), 9), "float", compression="gzip") # Write axons to hdf5 file ax_neuron, ax_name, ax_position, ax_rotation, ax_swc = self.gather_extra_axons() axon_group = neuron_group.create_group("extra_axons") axon_group.create_dataset("parent_neuron", data=ax_neuron) axon_group.create_dataset("name", (len(ax_name), ), data=ax_name, dtype=h5py.special_dtype(vlen=bytes), compression="gzip") axon_group.create_dataset("position", data=ax_position) axon_group.create_dataset("rotation", data=ax_rotation) axon_group.create_dataset("morphology", (len(ax_swc), ), data=ax_swc, dtype=h5py.special_dtype(vlen=bytes), compression="gzip") # TODO: Parent tree info, eller motsvarande, måste sparas också! pk_list = [n.parameter_key.encode("ascii", "ignore") if n.parameter_key is not None else "" for n in self.neurons] pk_str_type = 'S' + str(max(1, max([len(x) for x in pk_list]))) mk_list = [n.morphology_key.encode("ascii", "ignore") if n.morphology_key is not None else "" for n in self.neurons] mk_str_type = 'S' + str(max(1, max([len(x) for x in mk_list]))) mok_list = [n.modulation_key.encode("ascii", "ignore") if n.modulation_key is not None else "" for n in self.neurons] mok_str_type = 'S' + str(max(1, max([len(x) for x in mok_list]))) rd_list = [n.reaction_diffusion.encode("ascii", "ignore") if n.reaction_diffusion is not None else "" for n in self.neurons] rd_str_type = 'S' + str(max(1, max([len(x) for x in rd_list]))) neuron_param_key = neuron_group.create_dataset("parameter_key", (len(self.neurons),), pk_str_type, compression="gzip") neuron_morph_key = neuron_group.create_dataset("morphology_key", (len(self.neurons),), mk_str_type, compression="gzip") neuron_modulation_key = neuron_group.create_dataset("modulation_key", (len(self.neurons),), mok_str_type, compression="gzip") reaction_diffusion = neuron_group.create_dataset("reaction_diffusion_file", (len(self.neurons),), rd_str_type) neuron_pos_all = np.zeros((len(self.neurons), 3)) neuron_rot_all = np.zeros((len(self.neurons), 9)) neuron_param_key_list = [] neuron_param_key_idx = [] neuron_morph_key_list = [] neuron_morph_key_idx = [] neuron_mod_key_list = [] neuron_mod_key_idx = [] reacdiff_list = [] reacdiff_key = [] for (i, n) in enumerate(self.neurons): neuron_pos_all[i, :] = n.position neuron_rot_all[i, :] = n.rotation.reshape(1, 9) if n.parameter_key: neuron_param_key_list.append(n.parameter_key) neuron_param_key_idx.append(i) if n.morphology_key: neuron_morph_key_list.append(n.morphology_key) neuron_morph_key_idx.append(i) if n.modulation_key: neuron_mod_key_list.append(n.modulation_key) neuron_mod_key_idx.append(i) if n.reaction_diffusion: reacdiff_list.append(n.reaction_diffusion) reacdiff_key.append(i) neuron_position[:, :] = neuron_pos_all neuron_rotation[:, :] = neuron_rot_all if len(neuron_param_key_list) > 0: neuron_param_key[neuron_param_key_idx] = neuron_param_key_list if len(neuron_morph_key_list) > 0: neuron_morph_key[neuron_morph_key_idx] = neuron_morph_key_list if len(neuron_mod_key_list) > 0: neuron_modulation_key[neuron_mod_key_idx] = neuron_mod_key_list if len(reacdiff_list) > 0: reaction_diffusion[reacdiff_key] = reacdiff_list # Store input information if self.population_unit is None: # If no population units were defined, then set them all to 0 (= no population unit) self.population_unit = np.zeros((len(self.neurons),), dtype=int) neuron_group.create_dataset("population_unit_id", data=self.population_unit, dtype=int) # Variable for axon density "r", "xyz" or "" (No axon density) axon_density_type = [n.axon_density_type.encode("ascii", "ignore") if n.axon_density_type is not None else b"" for n in self.neurons] ad_str_type2 = "S" + str(max(1, max([len(x) if x is not None else 1 for x in axon_density_type]))) neuron_group.create_dataset("axon_density_type", (len(axon_density_type),), ad_str_type2, data=axon_density_type, compression="gzip") axon_density = [n.axon_density.encode("ascii", "ignore") if n.axon_density is not None else b"" for n in self.neurons] ad_str_type = "S" + str(max(1, max([len(x) if x is not None else 1 for x in axon_density]))) neuron_group.create_dataset("axon_density", (len(axon_density),), ad_str_type, data=axon_density, compression="gzip") axon_density_radius = [n.max_axon_radius if n.max_axon_radius is not None and n.axon_density_type == "r" else np.nan for n in self.neurons] neuron_group.create_dataset("axon_density_radius", data=axon_density_radius) # We also need to save axon_density_bounds_xyz, and num_axon points for the # non-spherical axon density option axon_density_bounds_xyz = np.nan * np.zeros((len(self.neurons), 6)) for ni, n in enumerate(self.neurons): if n.axon_density is None: # No axon density specified, skip continue if n.axon_density_type == "xyz": try: axon_density_bounds_xyz[ni, :] = np.array(n.axon_density_bounds_xyz) except: import traceback tstr = traceback.format_exc() self.write_log(tstr) self.write_log(f"Incorrect density string: {n.axon_density}") sys.exit(-1) neuron_group.create_dataset("axon_density_bounds_xyz", data=axon_density_bounds_xyz) pos_file.close() self.write_log(f"Write done.")
############################################################################
[docs] def define_population_units(self, config): """ Defines population units. """ method_lookup = {"random": self.random_labeling, "radial_density": self.population_unit_density_labeling, "mesh": self.population_unit_mesh} for region_name in self.config["regions"]: if "population_units" in self.config["regions"][region_name]: for unit_id in self.config["regions"][region_name]["population_units"]["unit_id"]: self.population_units[unit_id] = [] neuron_id = self.volume_neurons(region_name) method_name = self.config["regions"][region_name]["population_units"]["method"] assert method_name in method_lookup, \ (f"Unknown population placement method {method_name}. " f"Valid options are {', '.join([x for x in method_lookup])}") method_lookup[method_name](self.config["regions"][region_name]["population_units"], neuron_id)
############################################################################
[docs] def random_labeling(self, population_unit_info, neuron_id): """ Creates random labeling. Args: neuron_id (list) : All potential neuron ID population_unit_info (dict): Dictionary with "method" = "random" "unit_id" = ID of population unit "fraction_of_neurons" = Fraction of neurons in this unit "neuron_types" = List of neuron types that are in this unit "structure" = Name of the structure the unit is located in """ self.init_population_units() # This initialises population unit labelling if not already allocated unit_id = population_unit_info["unit_id"] fraction_of_neurons = population_unit_info["fraction_of_neurons"] neuron_types = population_unit_info["neuron_types"] # list of neuron types that belong to this population unit structure_name = population_unit_info["structure"] # First we need to generate unit lists (with fractions) for each neuron type units_available = dict() for uid, fon, neuron_type_list in zip(unit_id, fraction_of_neurons, neuron_types): for nt in neuron_type_list: if nt not in units_available: units_available[nt] = dict() units_available[nt]["unit"] = [] units_available[nt]["fraction"] = [] units_available[nt]["unit"].append(uid) units_available[nt]["fraction"].append(fon) all_neuron_types = [n.name.split("_")[0] for n in self.neurons] # Next we check that no fraction sum is larger than 1 for neuron_type in units_available: assert np.sum(units_available[neuron_type]["fraction"]) <= 1, \ (f"Population unit fraction sum for Neuron type {neuron_type} " f"in structure {structure_name} sums to more than 1.") assert (np.array(units_available[neuron_type]["fraction"]) >= 0).all(), \ f"Population unit fractions must be >= 0. Please check {neuron_type} in {structure_name}" cum_fraction = np.cumsum(units_available[neuron_type]["fraction"]) neurons_of_type = [self.neurons[nid].neuron_id for (nid, n_type) in zip(neuron_id, all_neuron_types) if n_type == neuron_type] rand_num = self.random_generator.uniform(size=len(neurons_of_type)) for nid, rn in zip(neurons_of_type, rand_num): # If our randum number is smaller than the first fraction, then neuron in first pop unit # if random number is between first and second cumulative fraction, then second pop unit # If larger than last cum_fraction, then no pop unit was picked (and we set pop unit 0) idx = len(cum_fraction) - np.sum(rn <= cum_fraction) if idx < len(cum_fraction): uid = units_available[neuron_type]["unit"][idx] self.population_unit[nid] = uid self.population_units[uid].append(nid) else: self.population_unit[nid] = 0
############################################################################
[docs] def population_unit_density_labeling(self, population_unit_info, neuron_id): """ Creates population units based on radial density functions. Args: neuron_id (list) : All potential neuron ID population_unit_info (dict): "method" must be "radial_density" "neuron_types" list of neuron types "centres" of radial probabilityes, one per neuron type "num_neurons" : Number of neurons in each population unit, only used with radial_density method "probability_functions" list of probability functions of r (as str) "unit_id" ID of population unit """ assert population_unit_info["method"] == "radial_density" self.init_population_units() # This initialises population unit labelling if not alraedy allocated neuron_types = population_unit_info["neuron_types"] centres = np.array(population_unit_info["centres"]) probability_functions = population_unit_info["probability_functions"] unit_id = np.array(population_unit_info["unit_id"]) if "num_neurons" in population_unit_info and population_unit_info["num_neurons"] is not None: num_neurons = population_unit_info["num_neurons"] if np.isscalar(num_neurons): num_neurons = np.full((len(centres),), num_neurons) else: num_neurons = [None for x in centres] assert len(neuron_types) == len(centres) == len(probability_functions) == len(unit_id) == len(num_neurons) # xyz = self.all_neuron_positions() unit_probability = np.zeros(centres.shape[0]) for nid in neuron_id: pos = self.neurons[nid].position neuron_type = self.neurons[nid].name.split("_")[0] for idx, (centre_pos, neuron_type_list, p_func) \ in enumerate(zip(centres, neuron_types, probability_functions)): if neuron_type in neuron_type_list: d = np.linalg.norm(pos - centre_pos) unit_probability[idx] = numexpr.evaluate(p_func) else: unit_probability[idx] = 0 # That unit does not contain this neuron type # Next we randomise membership rand_num = self.random_generator.uniform(size=len(unit_probability)) member_flag = rand_num < unit_probability # Currently we only allow a neuron to be member of one population unit n_flags = np.sum(member_flag) if n_flags == 0: self.population_unit[nid] = 0 elif n_flags == 1: self.population_unit[nid] = unit_id[np.where(member_flag)[0][0]] else: # More than one unit, pick the one that had smallest relative randnum idx = np.argmax(np.multiply(np.divide(unit_probability, rand_num), member_flag)) self.population_unit[nid] = unit_id[idx] # Also update dictionary with lists of neurons of that unit for uid in unit_id: # Channel 0 is unassigned, no channel, poor homeless neurons! self.population_units[uid] = np.where(self.population_unit == uid)[0] # Finally if num_neurons is specified, we need to reduce the number of neurons belonging to that unit for u_id, n_neurons in zip(unit_id, num_neurons): if n_neurons is not None: assert len(self.population_units[u_id]) >= n_neurons, \ f"Unable to pick {n_neurons} for population unit {u_id}, only {len(self.population_units[u_id])} ({neuron_types[np.where(unit_id == u_id)[0][0]]}) available." if n_neurons < len(self.population_units[u_id]): perm_nid = np.random.permutation(self.population_units[u_id]) keep_nid = perm_nid[:n_neurons] self.population_units[u_id] = keep_nid remove_nid = perm_nid[n_neurons:] for rid in remove_nid: self.population_unit[rid] = 0 if 0 in self.population_units: self.population_units[0] = np.sort(np.array(list(set(self.population_units[0]).union(remove_nid)))) else: self.population_units[0] = remove_nid
def population_unit_mesh(self, population_unit_info, neuron_id): self.init_population_units() # This initialises population unit labelling if not already allocated unit_id = population_unit_info["unit_id"] mesh_file = population_unit_info["mesh_file"] fraction_of_neurons = population_unit_info["fraction_of_neurons"] neuron_types = population_unit_info["neuron_types"] # list of neuron types that belong to this population unit structure_name = population_unit_info["structure"] pos = np.vstack([self.neurons[nid].position for nid in neuron_id]) model_neuron_types = [self.neurons[nid].name.split("_")[0] for nid in neuron_id] member_probability = np.zeros(shape=(pos.shape[0], len(mesh_file))) for idx, (mf, frac, nts) in enumerate(zip(mesh_file, fraction_of_neurons, neuron_types)): # This checks if neurons are of the types that are included in population unit has_nt = np.array([n in nts for n in model_neuron_types], dtype=bool) rm = RegionMesh(mf, verbose=self.verbose) member_probability[:, idx] = np.logical_and(rm.check_inside(pos), has_nt) * frac # If the probability sums to more than 1, then normalise it, otherwise keep smaller member_probability = np.divide(member_probability, np.maximum(1, np.sum(member_probability, axis =1).reshape(len(member_probability),1))) # Also, we need to add population unit 0 as an option, since choice needs P_sum = 1 full_member_probability = np.zeros(shape=(member_probability.shape[0], member_probability.shape[1]+1)) full_member_probability[:, :-1] = member_probability full_member_probability[:, -1] = np.maximum(1 - np.sum(member_probability, axis=1), 0) all_unit_id = unit_id + [0] # Normalise to 1 row_sums = np.sum(full_member_probability, axis=1) row_sums = row_sums[:, np.newaxis] full_member_probability = full_member_probability / row_sums for idx, (nid, P) in enumerate(zip(neuron_id, full_member_probability)): uid = self.random_generator.choice(all_unit_id, p=P) self.population_unit[nid] = uid if uid > 0: self.population_units[uid].append(nid) ############################################################################
[docs] def init_population_units(self): """ Initialise population units. If none are given they are all set to 0.""" if not self.population_unit: # If no population units were defined, then set them all to 0 (= no population unit) self.population_unit = np.zeros((len(self.neurons),), dtype=int)
# TODO: In prune we later need to gather all synapses belonging to each neuron, which means opening # the hypervoxel files that contain the worker's neurons' synapses. Therefore it is good to have # only nearby neurons on the same worker. Come up with a better scheme for sorting neurons. # def plot_ranges(self): from matplotlib import pyplot as plt n_workers = len(self.d_view) if self.d_view is not None else 1 range_borders = np.linspace(0, len(self.neurons), n_workers + 1).astype(int) colours = np.zeros((len(self.neurons),)) r_start = 0 for idx, r_end in enumerate(range_borders[1:]): colours[r_start:r_end] = idx + 1 r_start = r_end xyz = self.all_neuron_positions() fig = plt.figure() ax = fig.add_subplot(projection='3d') ax.scatter(xyz[:, 0], xyz[:, 1], xyz[:, 2], c=colours, alpha=0.5) plt.show()
[docs] def cluster_neurons(self, n_trials=5, rng=None): """ Cluster neurons, so that nearby neurons are grouped on same worker, to speed up simulations. Args: n_trials (int) : Number of trials for k-means clustering (default 5) """ if rng is None: rng = self.random_generator n_workers = len(self.d_view) if self.d_view is not None else 1 n_clusters = np.maximum(n_workers * 5, 100) n_clusters = np.minimum(n_clusters, len(self.neurons)) if n_workers > 1: self.write_log(f"Neurons order is optimised for {n_workers} workers. " f"For reproducibility always use the same number of workers.") xyz = self.all_neuron_positions() centroids, labels = scipy.cluster.vq.kmeans2(xyz, n_clusters, minit="points", seed=rng) n_centroids = centroids.shape[0] assert n_centroids == n_clusters cluster_member_list = [[] for x in range(n_centroids)] # Find the members of each cluster for idx, val in enumerate(labels): cluster_member_list[val].append(idx) num_neurons = xyz.shape[0] range_borders = np.linspace(0, num_neurons, n_workers + 1).astype(int) global_centroid_order = list(np.argsort(np.sum(centroids, axis=1))) neuron_order = -np.ones((num_neurons,), dtype=int) neuron_order_ctr = 0 range_start = 0 for range_end in range_borders[1:]: # While within a range, we want the clusters closest to the current primary cluster current_cluster = global_centroid_order[0] d = np.linalg.norm(centroids - centroids[current_cluster, :], axis=1) local_centroid_order = list(np.argsort(d)) while range_start < range_end and len(local_centroid_order) > 0: while len(cluster_member_list[local_centroid_order[0]]) == 0: del local_centroid_order[0] current_cluster = local_centroid_order[0] take_n = np.minimum(len(cluster_member_list[current_cluster]), range_end - range_start) neuron_order[neuron_order_ctr:neuron_order_ctr + take_n] = cluster_member_list[current_cluster][:take_n] neuron_order_ctr += take_n del cluster_member_list[current_cluster][:take_n] range_start += take_n while len(global_centroid_order) > 0 and len(cluster_member_list[global_centroid_order[0]]) == 0: del global_centroid_order[0] if len(global_centroid_order) == 0: break # Sometimes the original cluster is bad? Try again... if np.count_nonzero(neuron_order < 0) > 0 and n_trials > 1: self.write_log(f"Redoing place:neuron_clustering, {np.count_nonzero(neuron_order < 0)} " f"neurons unaccounted for", is_error=True) self.write_log(f"incorrect neuron_order={neuron_order} (printed for debugging)") neuron_order = self.cluster_neurons(n_trials=n_trials - 1, rng=rng) # TODO: This occured once on Tegner, why did it happen? assert np.count_nonzero(neuron_order < 0) == 0, \ "cluster_neurons: Not all neurons accounted for. Please rerun place." # Just some check that all is ok assert (np.diff(np.sort(neuron_order)) == 1).all(), "cluster_neurons: There are gaps in the sorting, error" # TODO: Verify that sort order is ok return neuron_order
[docs] def sort_neurons(self, sort_idx=None): """ Sorting neurons. If no argument is given they will be sorted along x,y,z axis. To use cluster sorting, use: sp.sort_neurons(sort_idx=sp.cluster_neurons()) Args: sort_idx (list, optional) : Sort order """ if sort_idx is None: # This changes the neuron IDs so the neurons are sorted along x,y or z xyz = self.all_neuron_positions() sort_idx = np.lexsort(xyz[:, [2, 1, 0]].transpose()) # x, y, z sort order self.write_log("Re-sorting the neuron IDs") for new_idx, old_idx in enumerate(sort_idx): self.neurons[old_idx].neuron_id = new_idx self.neurons = [self.neurons[x] for x in sort_idx] for idx, n in enumerate(self.neurons): assert idx == self.neurons[idx].neuron_id, \ "Something went wrong with sorting"
def volume_neurons(self, volume_id): return [n.neuron_id for n in self.neurons if n.volume_id == volume_id] @staticmethod def replicate_str(string, n_replicas, variable_name=None): # variable_name is just used to help with error message if incorrect input if string is None or isinstance(string, str): rep_str = [string for x in range(len(n_replicas))] elif len(string) == n_replicas: rep_str = string else: raise ValueError(f"Expected a str or a list of str of length {len(n_replicas)} for {variable_name}") return rep_str
############################################################################ if __name__ == "__main__": assert False, "Please use snudda.py place networks/yournetwork"