Source code for snudda.detect.project

# In addition to adding synapses using touch detection we also want the ability to add connections
# when we do not have an axon, for example over long range between structures, to connect them together.
# This is what project.py is responsible for.
from collections import OrderedDict

import numpy as np
import json
import os
import h5py

from scipy.interpolate import griddata

from snudda.detect.detect import SnuddaDetect
from snudda.neurons.neuron_morphology import NeuronMorphology
from snudda.neurons.neuron_prototype import NeuronPrototype
from snudda.utils.load import SnuddaLoad


[docs]class SnuddaProject(object): """ Adds projections between neurons, useful for connecting different regions with long range connections. """ # TODO: Add support for log files!! # TODO: Add support for parallel execution def __init__(self, network_path, rng=None, random_seed=None, h5libver=None): """ Constructor. Args: network_path: Path to network directory rng: Numpy random stream random_seed: Random seed h5libver: Version of hdf5 driver to use """ self.network_path = network_path self.network_info = None self.work_history_file = os.path.join(self.network_path, "log", "network-detect-worklog.hdf5") self.output_file_name = os.path.join(self.network_path, "network-projection-synapses.hdf5") max_synapses = 100000 self.synapses = np.zeros((max_synapses, 13), dtype=np.int32) self.synapse_ctr = 0 self.connectivity_distributions = dict() self.prototype_neurons = dict() self.next_channel_model_id = 10 self.simulation_origo = None self.voxel_size = None # Parameters for the HDF5 writing, this affects write speed self.synapse_chunk_size = 10000 self.h5compression = "lzf" config_file = os.path.join(self.network_path, "network-config.json") with open(config_file, "r") as f: self.config = json.load(f, object_pairs_hook=OrderedDict) if not h5libver: self.h5libver = "latest" else: self.h5libver = h5libver # Setup random generator, # TODO: this assumes serial execution. Update for parallel version if rng: self.rng = rng elif random_seed: self.rng = np.random.default_rng(random_seed) else: random_seed = self.config["RandomSeed"]["project"] self.rng = np.random.default_rng(random_seed) self.read_neuron_positions() self.read_prototypes()
[docs] def read_neuron_positions(self): """ Reads in neuron positions from network-neuron-positions.hdf5 """ position_file = os.path.join(self.network_path, "network-neuron-positions.hdf5") self.network_info = SnuddaLoad(position_file) # We also need simulation origo and voxel size work_history_file = os.path.join(self.network_path, "log", "network-detect-worklog.hdf5") with h5py.File(work_history_file, "r") as work_hist: self.simulation_origo = work_hist["meta/simulationOrigo"][()] self.voxel_size = work_hist["meta/voxelSize"][()]
# This is a simplified version of the prototype load in detect
[docs] def read_prototypes(self): """ Reads in neuron prototypes. Simplified version of what same function in detect.py does. """ for name, definition in self.config["Neurons"].items(): morph = definition["morphology"] param = definition["parameters"] if "modulation" in definition: modulation = definition["modulation"] else: modulation = None mechanisms = definition["mechanisms"] # TODO: Need to update to use NeuronPrototype !!! self.prototype_neurons[name] = NeuronPrototype(neuron_name=name, neuron_path=None, morphology_path=morph, parameter_path=param, modulation_path=modulation, mechanism_path=mechanisms) # TODO: The code below is duplicate from detect.py, update so both use same code base for name, definition in self.config["Connectivity"].items(): pre_type, post_type = name.split(",") con_def = definition.copy() for key in con_def: if key == "GapJunction": con_def[key]["channelModelID"] = 3 else: con_def[key]["channelModelID"] = self.next_channel_model_id self.next_channel_model_id += 1 # Also if conductance is just a number, add std 0 if type(con_def[key]["conductance"]) not in [list, tuple]: con_def[key]["conductance"] = [con_def[key]["conductance"], 0] self.connectivity_distributions[pre_type, post_type] = con_def
[docs] def project(self, write=True): """ Create projections between neurons. """ for (pre_type, post_type), connection_info in self.connectivity_distributions.items(): self.connect_projection_helper(pre_type, post_type, connection_info) if write: self.write()
[docs] def connect_projection_helper(self, pre_neuron_type, post_neuron_type, connection_info): """ Helper function for project. Args: pre_neuron_type: Type of presynaptic neuron post_neuron_type: Type of postsynaptic neuron connection_info: Connection info """ for connection_type, con_info in connection_info.items(): if "projectionFile" not in con_info: # Not a projection, skipping continue projection_file = con_info["projectionFile"] with open(projection_file, "r") as f: projection_data = json.load(f, object_pairs_hook=OrderedDict) if "projectionName" in con_info: proj_name = con_info["projectionName"] projection_source = np.array(projection_data[proj_name]["source"])*1e-6 projection_destination = np.array(projection_data[proj_name]["destination"])*1e-6 else: projection_source = np.array(projection_data["source"])*1e-6 projection_destination = np.array(projection_data["destination"])*1e-6 if "projectionRadius" in con_info: projection_radius = con_info["projectionRadius"] else: projection_radius = None # Find the closest neurons # TODO: Add projectionDensity later # if "projectionDensity" in con_info: # projection_density = con_info["projectionDensity"] # else: # projection_density = None # All neurons within projection radius equally likely if "numberOfTargets" in con_info: if type(con_info["numberOfTargets"]) == list: number_of_targets = np.array(con_info["numberOfTargets"]) # mean, std else: number_of_targets = np.array([con_info["numberOfTargets"], 0]) if "numberOfSynapses" in con_info: if type(con_info["numberOfSynapses"]) == list: number_of_synapses = np.array(con_info["numberOfSynapses"]) # mean, std else: number_of_synapses = np.array([con_info["numberOfSynapses"], 0]) else: number_of_synapses = np.array([1, 0]) if "dendriteSynapseDensity" in con_info: dendrite_synapse_density = con_info["dendriteSynapseDensity"] if type(con_info["conductance"]) == list: conductance_mean, conductance_std = con_info["conductance"] else: conductance_mean, conductance_std = con_info["conductance"], 0 # The channelModelID is added to config information channel_model_id = con_info["channelModelID"] # Find all the presynaptic neurons in the network pre_id_list = self.network_info.get_neuron_id_of_type(pre_neuron_type) pre_positions = self.network_info.data["neuronPositions"][pre_id_list, :] # Find all the postsynaptic neurons in the network post_id_list = self.network_info.get_neuron_id_of_type(post_neuron_type) post_name_list = [self.network_info.data["name"][x] for x in post_id_list] post_positions = self.network_info.data["neuronPositions"][post_id_list, :] # For each presynaptic neuron, find their target regions. # -- if you want two distinct target regions, you have to create two separate maps target_centres = griddata(points=projection_source, values=projection_destination, xi=pre_positions, method="linear") num_targets = self.rng.normal(number_of_targets[0], number_of_targets[1], len(pre_id_list)).astype(int) # For each presynaptic neuron, using the supplied map, find the potential post synaptic targets for pre_id, centre_pos, n_targets in zip(pre_id_list, target_centres, num_targets): d = np.linalg.norm(centre_pos - post_positions, axis=1) # !! Double check right axis d_idx = np.argsort(d) if projection_radius: d_idx = d_idx[np.where(d[d_idx] <= projection_radius)[0]] if len(d_idx) > n_targets: d_idx = self.rng.permutation(d_idx)[:n_targets] elif len(d_idx) > n_targets: d_idx = d_idx[:n_targets] target_id = [post_id_list[x] for x in d_idx] target_name = [post_name_list[x] for x in d_idx] axon_dist = d[d_idx] n_synapses = self.rng.normal(number_of_synapses[0], number_of_synapses[1], len(target_id)).astype(int) for t_id, t_name, n_syn, ax_dist in zip(target_id, target_name, n_synapses, axon_dist): # We need to place neuron correctly in space (work on clone), # so that synapse coordinates are correct morph_prototype = self.prototype_neurons[t_name] position = self.network_info.data["neurons"][t_id]["position"] rotation = self.network_info.data["neurons"][t_id]["rotation"] parameter_id = self.network_info.data["neurons"][t_id]["parameterID"] morphology_id = self.network_info.data["neurons"][t_id]["morphologyID"] modulation_id = self.network_info.data["neurons"][t_id]["modulationID"] morph = morph_prototype.clone(parameter_id=parameter_id, morphology_id=morphology_id, modulation_id=modulation_id, position=position, rotation=rotation) # We are not guaranteed to get n_syn positions, so use len(sec_x) to get how many after # TODO: Fix so dendrite_input_locations always returns n_syn synapses xyz, sec_id, sec_x, dist_to_soma = morph.dendrite_input_locations(dendrite_synapse_density, self.rng, num_locations=n_syn) # We need to convert xyz into voxel coordinates to match data format of synapse matrix xyz = np.round((xyz - self.simulation_origo) / self.voxel_size) # https://www.nature.com/articles/nrn3687 -- lognormal # TODO: Precompute these mu = np.log(conductance_mean ** 2 / np.sqrt(conductance_mean ** 2 + conductance_std ** 2)) sigma = np.sqrt(np.log(1 + conductance_std ** 2 / conductance_mean ** 2)) cond = self.rng.lognormal(mu, sigma, len(sec_x)) cond = np.maximum(cond, conductance_mean*0.1) # Lower bound, prevent negative. param_id = self.rng.integers(1000000, size=len(sec_x)) # TODO: Add code to extend synapses matrix if it is full for i in range(len(sec_id)): self.synapses[self.synapse_ctr, :] = \ [pre_id, t_id, xyz[i, 0], xyz[i, 1], xyz[i, 2], -1, # Hypervoxelid channel_model_id, ax_dist*1e6, dist_to_soma[i]*1e6, sec_id[i], sec_x[i] * 1000, cond[i] * 1e12, param_id[i]] self.synapse_ctr += 1
[docs] def write(self): """ Writes projection data to file. """ # Before writing synapses, lets make sure they are sorted. # Sort order: columns 1 (dest), 0 (src), 6 (synapse type) sort_idx = np.lexsort(self.synapses[:self.synapse_ctr, [6, 0, 1]].transpose()) self.synapses[:self.synapse_ctr, :] = self.synapses[sort_idx, :] # Write synapses to file with h5py.File(self.output_file_name, "w", libver=self.h5libver) as out_file: out_file.create_dataset("config", data=json.dumps(self.config)) network_group = out_file.create_group("network") network_group.create_dataset("synapses", data=self.synapses[:self.synapse_ctr, :], dtype=np.int32, chunks=(self.synapse_chunk_size, 13), maxshape=(None, 13), compression=self.h5compression) network_group.create_dataset("nSynapses", data=self.synapse_ctr, dtype=int) network_group.create_dataset("nNeurons", data=self.network_info.data["nNeurons"], dtype=int) # This is useful so the merge_helper knows if they need to search this file for synapses all_target_id = np.unique(self.synapses[:self.synapse_ctr, 1]) network_group.create_dataset("allTargetId", data=all_target_id) # This creates a lookup that is used for merging later synapse_lookup = SnuddaDetect.create_lookup_table(data=self.synapses, n_rows=self.synapse_ctr, data_type="synapses", num_neurons=self.network_info.data["nNeurons"], max_synapse_type=self.next_channel_model_id) network_group.create_dataset("synapseLookup", data=synapse_lookup) network_group.create_dataset("maxChannelTypeID", data=self.next_channel_model_id, dtype=int) # We also need to update the work history file with how many synapses we created # for the projections between volumes with h5py.File(self.work_history_file, "a", libver=self.h5libver) as hist_file: if "nProjectionSynapses" in hist_file: hist_file["nProjectionSynapses"][()] = self.synapse_ctr else: hist_file.create_dataset("nProjectionSynapses", data=self.synapse_ctr, dtype=int)