6 from xopen
import xopen
7 import fast_matrix_market
as fmm
15 logging
.basicConfig(format
='%(asctime)s.%(msecs)03d %(levelname)-8s|%(message)s\t|%(module)s:%(funcName)s:%(lineno)d|%(process)d:%(thread)d|%(pathname)s',
16 datefmt
='%Y-%m-%dT%H:%M:%S',level
=logging
.INFO
)
17 logger
= logging
.getLogger(__name__
)
19 from concurrent
.futures
import ThreadPoolExecutor
, as_completed
20 def export_adata(adata
, path
):
22 Export AnnData object to 10x Genomics format with gzip compression using xopen.
25 The AnnData object to export.
27 The directory where the files should be saved.
29 # Create destination directory if it doesn't exist
30 os
.makedirs(path
, exist_ok
=True)
31 # Step 1: Append spatial coordinates directly to obs
32 if 'spatial' in adata
.obsm
:
33 adata
.obs
['m_spatial_x'] = adata
.obsm
['spatial'][:, 0] # X coordinates
34 adata
.obs
['m_spatial_y'] = adata
.obsm
['spatial'][:, 1] # Y coordinates
36 genes_path
= os
.path
.join(path
, "features.tsv.gz")
37 barcodes_path
= os
.path
.join(path
, "barcodes.tsv.gz")
38 spatial_path
= os
.path
.join(path
, "spatial.txt.gz")
39 matrix_path
= os
.path
.join(path
, "matrix.mtx.gz")
40 # Prepare dataframes for writing
41 genes_df
= pd
.DataFrame({
42 'gene_name': adata
.var
.index
,
43 'feature_types': "Gene Expression"
44 }, index
=['g' + str(i
+ 1) for i
in range(len(adata
.var
.index
))]) # Set gene_id as index
45 barcodes_with_spatial
= adata
.obs
[['m_spatial_x', 'm_spatial_y']]
47 # Ensure that adata.X is a sparse matrix
48 if not isinstance(adata
.X
, scipy
.sparse
.spmatrix
):
49 adata
.X
= scipy
.sparse
.csr_matrix(adata
.X
)
50 # Check equality of float and int representations
51 float_matrix
= adata
.X
52 int_matrix
= float_matrix
.astype(int)
53 # If the two matrices are equal, convert to integer
54 if (float_matrix
!= int_matrix
).nnz
== 0:
55 adata
.X
= int_matrix
# Set X to its integer representation
57 compression_parameters
= {'method': 'gzip', 'mtime': 1}
62 index
=True, # Write index as well, which will now be gene_id
64 compression
=compression_parameters
67 # Write only the index (barcodes) to the file, without a header
68 barcodes_with_spatial
.index
.to_series().to_csv(
73 compression
=compression_parameters
76 barcodes_with_spatial
.to_csv(
81 compression
={**compression_parameters
, 'compresslevel': 1}
84 with
xopen(matrix_path
, 'wb') as f
:
85 fmm
.mmwrite(f
, adata
.X
.T
)
87 # Use ThreadPoolExecutor to write files concurrently
88 with
ThreadPoolExecutor(max_workers
=4) as executor
:
90 executor
.submit(write_genes
): "genes",
91 executor
.submit(write_barcodes
): "barcodes",
92 executor
.submit(write_spatial
): "spatial",
93 executor
.submit(write_matrix
): "matrix"
95 for future
in as_completed(futures
):
96 task_name
= futures
[future
]
97 future
.result() # Get result to raise exceptions if any
98 # export_adata(combined_adata, 'path/to/output_directory')
100 def cs_read(incsv
, dgepath
):
101 csdf
= pd
.read_csv(incsv
)
102 csdf
['h5ad_path'] = pd
.NA
103 pattern
= 'dge.all.polyA_adapter_trimmed.mm_included.spatial_beads_*.h5ad'
104 search_files
= glob
.glob(os
.path
.join(dgepath
, pattern
))
105 for file_path
in search_files
:
106 base_name
= os
.path
.basename(file_path
)
107 puck_id
= base_name
.split('.')[-2].replace('spatial_beads_', '')
108 if puck_id
in csdf
['puck_id'].values
:
109 # Get the corresponding row
110 row_index
= csdf
[csdf
['puck_id'] == puck_id
].index
[0]
111 csdf
.at
[row_index
, 'h5ad_path'] = file_path
112 x_offset
= csdf
.at
[row_index
, 'x_offset']
113 y_offset
= csdf
.at
[row_index
, 'y_offset']
114 #print(f"[{puck_id}, {x_offset}, {y_offset}] {file_path}")
115 csdfused
= csdf
[csdf
['h5ad_path'].notna()].drop(columns
=['z_offset'])
120 for index
, row
in csdf
.iterrows():
121 h5ad_path
= row
['h5ad_path']
122 adata
= sc
.read_h5ad(h5ad_path
)
123 logger
.info(f
'Load anndata {row["puck_id"]}: {adata.shape}')
124 # Apply x_offset and y_offset
125 x_offset
= row
['x_offset']
126 y_offset
= row
['y_offset']
127 # Shift the spatial coordinates
128 if 'spatial' in adata
.obsm
.keys():
129 # Create a 1D array for offsets
130 offsets
= np
.array([x_offset
, y_offset
])
131 adata
.obsm
['spatial'] += offsets
# Adjust the spatial coordinates
132 logger
.info(f
'Shift anndata {row["puck_id"]}: x+={x_offset}, y+={y_offset}')
133 # Append the modified AnnData object to the list
135 # Step 2: Use anndata.concat to merge all AnnData objects with outer join on .var
136 combined_adata
= ad
.concat(adatas
, join
='outer', label
='source')
137 return combined_adata
140 if len(sys
.argv
) < 4:
141 print(f
'Usage: {sys.argv[0]} <coordinate_system.csv> <spacemake dge path> <outpath>', file=sys
.stderr
, flush
=True)
143 elif len(sys
.argv
) >= 4:
145 dgepath
= sys
.argv
[2]
146 outpath
= sys
.argv
[3]
147 logger
.info(f
'Coordinate:[{incsv}],DGE:[{dgepath}] => [{outpath}]/xxx')
148 csdf
= cs_read(incsv
,dgepath
)
149 logger
.info(f
'Load Coordinate_system: {len(csdf)}')
151 os
.makedirs(outpath
, exist_ok
=True)
152 combined_adata
= sp_concat(csdf
)
153 logger
.info(f
'Merged anndata: {combined_adata.shape}')
154 h5adfilename
= os
.path
.join(outpath
, 'raw_puck_collection.h5ad')
155 combined_adata
.write_h5ad(h5adfilename
,compression
='lzf')
156 logger
.info(f
'Saved anndata: {h5adfilename}')
157 mtxpath
= os
.path
.join(outpath
, 'raw')
158 export_adata(combined_adata
, mtxpath
)
159 logger
.info(f
'Exported anndata: {mtxpath}')
161 if __name__
== "__main__":
162 main() # ./openstmerge.py puck_data/fc_2_coordinate_system.csv projects/mbrain/processed_data/openst_musbrain/illumina/complete_data/dge combined_mtx4