# In addition to adding synapses using touch detection we also want the ability to add connections
# when we do not have an axon, for example over long range between structures, to connect them together.
# This is what project.py is responsible for.
import json
import os
from collections import OrderedDict
import h5py
import numpy as np
from scipy.interpolate import griddata
from snudda.utils.snudda_path import get_snudda_data
from snudda.detect.detect import SnuddaDetect
from snudda.neurons.neuron_prototype import NeuronPrototype
from snudda.utils.load import SnuddaLoad
[docs]class SnuddaProject(object):
""" Adds projections between neurons, useful for connecting different regions with long range connections. """
# TODO: Add support for log files!!
# TODO: Add support for parallel execution
def __init__(self, network_path, snudda_data=None, rng=None, random_seed=None, h5libver=None):
"""
Constructor.
Args:
network_path: Path to network directory
rng: Numpy random stream
random_seed: Random seed
h5libver: Version of hdf5 driver to use
"""
self.network_path = network_path
self.snudda_data = get_snudda_data(snudda_data=snudda_data,
network_path=self.network_path)
self.network_info = None
self.work_history_file = os.path.join(self.network_path, "log", "network-detect-worklog.hdf5")
self.output_file_name = os.path.join(self.network_path, "network-projection-synapses.hdf5")
max_synapses = 100000
self.synapses = np.zeros((max_synapses, 13), dtype=np.int32)
self.synapse_ctr = 0
self.connectivity_distributions = dict()
self.prototype_neurons = dict()
self.next_channel_model_id = 10
self.simulation_origo = None
self.voxel_size = None
# Parameters for the HDF5 writing, this affects write speed
self.synapse_chunk_size = 10000
self.h5compression = "lzf"
config_file = os.path.join(self.network_path, "network-config.json")
with open(config_file, "r") as f:
self.config = json.load(f, object_pairs_hook=OrderedDict)
if not h5libver:
self.h5libver = "latest"
else:
self.h5libver = h5libver
# Setup random generator,
# TODO: this assumes serial execution. Update for parallel version
if rng:
self.rng = rng
elif random_seed:
self.rng = np.random.default_rng(random_seed)
else:
random_seed = self.config["RandomSeed"]["project"]
self.rng = np.random.default_rng(random_seed)
self.read_neuron_positions()
self.read_prototypes()
[docs] def read_neuron_positions(self):
""" Reads in neuron positions from network-neuron-positions.hdf5 """
position_file = os.path.join(self.network_path, "network-neuron-positions.hdf5")
self.network_info = SnuddaLoad(position_file)
# We also need simulation origo and voxel size
work_history_file = os.path.join(self.network_path, "log", "network-detect-worklog.hdf5")
with h5py.File(work_history_file, "r") as work_hist:
self.simulation_origo = work_hist["meta/simulationOrigo"][()]
self.voxel_size = work_hist["meta/voxelSize"][()]
# This is a simplified version of the prototype load in detect
[docs] def read_prototypes(self):
""" Reads in neuron prototypes. Simplified version of what same function in detect.py does. """
for name, definition in self.config["Neurons"].items():
morph = definition["morphology"]
param = definition["parameters"]
if "modulation" in definition:
modulation = definition["modulation"]
else:
modulation = None
mechanisms = definition["mechanisms"]
# TODO: Need to update to use NeuronPrototype !!!
self.prototype_neurons[name] = NeuronPrototype(neuron_name=name,
neuron_path=None,
snudda_data=self.snudda_data,
morphology_path=morph,
parameter_path=param,
modulation_path=modulation,
mechanism_path=mechanisms)
# TODO: The code below is duplicate from detect.py, update so both use same code base
for name, definition in self.config["Connectivity"].items():
pre_type, post_type = name.split(",")
con_def = definition.copy()
for key in con_def:
if key == "GapJunction":
con_def[key]["channelModelID"] = 3
else:
con_def[key]["channelModelID"] = self.next_channel_model_id
self.next_channel_model_id += 1
# Also if conductance is just a number, add std 0
if type(con_def[key]["conductance"]) not in [list, tuple]:
con_def[key]["conductance"] = [con_def[key]["conductance"], 0]
self.connectivity_distributions[pre_type, post_type] = con_def
[docs] def project(self, write=True):
""" Create projections between neurons. """
for (pre_type, post_type), connection_info in self.connectivity_distributions.items():
self.connect_projection_helper(pre_type, post_type, connection_info)
if write:
self.write()
[docs] def connect_projection_helper(self, pre_neuron_type, post_neuron_type, connection_info):
"""
Helper function for project.
Args:
pre_neuron_type: Type of presynaptic neuron
post_neuron_type: Type of postsynaptic neuron
connection_info: Connection info
"""
for connection_type, con_info in connection_info.items():
if "projectionFile" not in con_info or "projectionName" not in con_info:
# Not a projection, skipping
continue
projection_file = con_info["projectionFile"]
with open(projection_file, "r") as f:
projection_data = json.load(f, object_pairs_hook=OrderedDict)
if "projectionName" in con_info:
proj_name = con_info["projectionName"]
projection_source = np.array(projection_data[proj_name]["source"]) * 1e-6
projection_destination = np.array(projection_data[proj_name]["destination"]) * 1e-6
else:
projection_source = np.array(projection_data["source"]) * 1e-6
projection_destination = np.array(projection_data["destination"]) * 1e-6
if "projectionRadius" in con_info:
projection_radius = con_info["projectionRadius"]
else:
projection_radius = None # Find the closest neurons
# TODO: Add projectionDensity later
# if "projectionDensity" in con_info:
# projection_density = con_info["projectionDensity"]
# else:
# projection_density = None # All neurons within projection radius equally likely
if "numberOfTargets" in con_info:
if type(con_info["numberOfTargets"]) == list:
number_of_targets = np.array(con_info["numberOfTargets"]) # mean, std
else:
number_of_targets = np.array([con_info["numberOfTargets"], 0])
if "numberOfSynapses" in con_info:
if type(con_info["numberOfSynapses"]) == list:
number_of_synapses = np.array(con_info["numberOfSynapses"]) # mean, std
else:
number_of_synapses = np.array([con_info["numberOfSynapses"], 0])
else:
number_of_synapses = np.array([1, 0])
if "dendriteSynapseDensity" in con_info:
dendrite_synapse_density = con_info["dendriteSynapseDensity"]
if type(con_info["conductance"]) == list:
conductance_mean, conductance_std = con_info["conductance"]
else:
conductance_mean, conductance_std = con_info["conductance"], 0
# The channelModelID is added to config information
channel_model_id = con_info["channelModelID"]
# Find all the presynaptic neurons in the network
pre_id_list = self.network_info.get_neuron_id_of_type(pre_neuron_type)
pre_positions = self.network_info.data["neuronPositions"][pre_id_list, :]
# Find all the postsynaptic neurons in the network
post_id_list = self.network_info.get_neuron_id_of_type(post_neuron_type)
post_name_list = [self.network_info.data["name"][x] for x in post_id_list]
post_positions = self.network_info.data["neuronPositions"][post_id_list, :]
# For each presynaptic neuron, find their target regions.
# -- if you want two distinct target regions, you have to create two separate maps
target_centres = griddata(points=projection_source,
values=projection_destination,
xi=pre_positions, method="linear")
num_targets = self.rng.normal(number_of_targets[0],
number_of_targets[1],
len(pre_id_list)).astype(int)
# For each presynaptic neuron, using the supplied map, find the potential post synaptic targets
for pre_id, centre_pos, n_targets in zip(pre_id_list, target_centres, num_targets):
d = np.linalg.norm(centre_pos - post_positions, axis=1) # !! Double check right axis
d_idx = np.argsort(d)
if projection_radius:
d_idx = d_idx[np.where(d[d_idx] <= projection_radius)[0]]
if len(d_idx) > n_targets:
d_idx = self.rng.permutation(d_idx)[:n_targets]
elif len(d_idx) > n_targets:
d_idx = d_idx[:n_targets]
target_id = [post_id_list[x] for x in d_idx]
target_name = [post_name_list[x] for x in d_idx]
axon_dist = d[d_idx]
n_synapses = self.rng.normal(number_of_synapses[0],
number_of_synapses[1],
len(target_id)).astype(int)
for t_id, t_name, n_syn, ax_dist in zip(target_id, target_name, n_synapses, axon_dist):
# We need to place neuron correctly in space (work on clone),
# so that synapse coordinates are correct
morph_prototype = self.prototype_neurons[t_name]
position = self.network_info.data["neurons"][t_id]["position"]
rotation = self.network_info.data["neurons"][t_id]["rotation"]
parameter_id = self.network_info.data["neurons"][t_id]["parameterID"]
morphology_id = self.network_info.data["neurons"][t_id]["morphologyID"]
modulation_id = self.network_info.data["neurons"][t_id]["modulationID"]
morph = morph_prototype.clone(parameter_id=parameter_id,
morphology_id=morphology_id,
modulation_id=modulation_id,
position=position,
rotation=rotation)
# We are not guaranteed to get n_syn positions, so use len(sec_x) to get how many after
# TODO: Fix so dendrite_input_locations always returns n_syn synapses
xyz, sec_id, sec_x, dist_to_soma = morph.dendrite_input_locations(dendrite_synapse_density,
self.rng,
num_locations=n_syn)
# We need to convert xyz into voxel coordinates to match data format of synapse matrix
xyz = np.round((xyz - self.simulation_origo) / self.voxel_size)
# https://www.nature.com/articles/nrn3687 -- lognormal
# TODO: Precompute these
mu = np.log(conductance_mean ** 2 / np.sqrt(conductance_mean ** 2 + conductance_std ** 2))
sigma = np.sqrt(np.log(1 + conductance_std ** 2 / conductance_mean ** 2))
cond = self.rng.lognormal(mu, sigma, len(sec_x))
cond = np.maximum(cond, conductance_mean * 0.1) # Lower bound, prevent negative.
param_id = self.rng.integers(1000000, size=len(sec_x))
# TODO: Add code to extend synapses matrix if it is full
for i in range(len(sec_id)):
self.synapses[self.synapse_ctr, :] = \
[pre_id, t_id,
xyz[i, 0], xyz[i, 1], xyz[i, 2],
-1, # Hypervoxelid
channel_model_id,
ax_dist * 1e6, dist_to_soma[i] * 1e6,
sec_id[i], sec_x[i] * 1000,
cond[i] * 1e12, param_id[i]]
self.synapse_ctr += 1
[docs] def write(self):
""" Writes projection data to file. """
# Before writing synapses, lets make sure they are sorted.
# Sort order: columns 1 (dest), 0 (src), 6 (synapse type)
sort_idx = np.lexsort(self.synapses[:self.synapse_ctr, [6, 0, 1]].transpose())
self.synapses[:self.synapse_ctr, :] = self.synapses[sort_idx, :]
# Write synapses to file
with h5py.File(self.output_file_name, "w", libver=self.h5libver) as out_file:
out_file.create_dataset("config", data=json.dumps(self.config))
network_group = out_file.create_group("network")
network_group.create_dataset("synapses",
data=self.synapses[:self.synapse_ctr, :],
dtype=np.int32,
chunks=(self.synapse_chunk_size, 13),
maxshape=(None, 13),
compression=self.h5compression)
network_group.create_dataset("nSynapses", data=self.synapse_ctr, dtype=int)
network_group.create_dataset("nNeurons", data=self.network_info.data["nNeurons"], dtype=int)
# This is useful so the merge_helper knows if they need to search this file for synapses
all_target_id = np.unique(self.synapses[:self.synapse_ctr, 1])
network_group.create_dataset("allTargetId", data=all_target_id)
# This creates a lookup that is used for merging later
synapse_lookup = SnuddaDetect.create_lookup_table(data=self.synapses,
n_rows=self.synapse_ctr,
data_type="synapses",
num_neurons=self.network_info.data["nNeurons"],
max_synapse_type=self.next_channel_model_id)
network_group.create_dataset("synapseLookup", data=synapse_lookup)
network_group.create_dataset("maxChannelTypeID", data=self.next_channel_model_id, dtype=int)
# We also need to update the work history file with how many synapses we created
# for the projections between volumes
with h5py.File(self.work_history_file, "a", libver=self.h5libver) as hist_file:
if "nProjectionSynapses" in hist_file:
hist_file["nProjectionSynapses"][()] = self.synapse_ctr
else:
hist_file.create_dataset("nProjectionSynapses", data=self.synapse_ctr, dtype=int)