I am developing a python script which, given:
should:
substitute string A with string B in the cells of the indicated columns.
However, I am having trouble with writing the changed row in the output csv, because csv.writer encloses the modified lines in double quotes, while I don't want thema, but I don't understand how to handle the quoting.
Note: I cannot use pandas.
soruce file:
code1;code2;money1;code3;type_payment;money2
74;1;185.04;10;AMEXCO;36.08
74;1;8.06;11;MASTERCARD;538.30
74;1;892.46;12;VISA;185.04
74;1;75.10;15;MAESTRO;8.06
74;1;63.92;16;BANCOMAT;892.46
desired output:
code1;code2;money1;code3;type_payment;money2
74;1;185,04;10;AMEXCO;36,08
74;1;8,06;11;MASTERCARD;538,30
74;1;892,46;12;VISA;185,04
74;1;75,10;15;MAESTRO;8,06
74;1;63,92;16;BANCOMAT;892,46
actual output:
code1;code2;money1;code3;type_payment;money2
"74;1;185,04;10;AMEXCO;36,08"
"74;1;8,06;11;MASTERCARD;538,30"
"74;1;892,46;12;VISA;185,04"
"74;1;75,10;15;MAESTRO;8,06"
"74;1;63,92;16;BANCOMAT;892,46"
my script:
result = {}
csv_file_path = 'myreport.csv'
columns_to_process = ['money1', 'money2']
string_to_be_replaced = "."
string_to_replace_with = ","
mydelimiter = ";"
#--------------------------
# specific import for csv
import csv, io
# import operator
import shutil
# specific import to manage errors
import os, traceback
# define custom errors
class DelimiterManagementError(Exception):
"""This one is raised if the row splitted into single cells have a length greater than the length of the header, for which we assume the delimiter is not inside the cells of the header"""
# check file existence
if not os.path.isfile(csv_file_path):
raise IOError("csv_file_path is not valid or does not exists: {}".format(csv_file_path))
# check the delimiter existence
with open(csv_file_path, 'r') as csvfile:
first_line = csvfile.readline()
# print("first_line", first_line)
if mydelimiter not in first_line:
delimiter_warning_message = "No delimiter found in file first line."
result['warning_messages'].append(delimiter_warning_message)
# count the lines in the source file
NOL = sum(1 for _ in io.open(csv_file_path, "r"))
# print("NOL:", NOL)
# if NOL = 0 -> void file
# if NOL = 1 -> header-only file
if NOL > 0:
# just get the columns names, then close the file
#-----------------------------------------------------
with open(csv_file_path, 'r') as csvfile:
columnslist = csv.DictReader(csvfile, delimiter=mydelimiter)
list_of_dictcolumns = []
# loop to iterate through the rows of csv
for row in columnslist:
# adding the first row
list_of_dictcolumns.append(row)
# breaking the loop after the
# first iteration itself
break
# transform the colum names into a list
first_dictcolumn = list_of_dictcolumns[0]
list_of_column_names = list(first_dictcolumn.keys())
number_of_columns = len(list_of_column_names)
# check columns existence
#------------------------
column_existence = [ (column_name in list_of_column_names ) for column_name in columns_to_process ]
if not all(column_existence):
raise ValueError("File {} does not contains all the columns given in input for processing:\nFile columns names: {}\nInput columns names: {}".format(csv_file_path, list_of_column_names, columns_to_process))
# determine the indexes of the columns to process
indexes_of_columns_to_process = [i for i, column_name in enumerate(list_of_column_names) if column_name in columns_to_process]
print("indexes_of_columns_to_process: ", indexes_of_columns_to_process)
# build the path of a to-be-generated duplicate file to be used as output
inputcsv_absname, inputcsv_extension = os.path.splitext(csv_file_path)
csv_output_file_path = inputcsv_absname + '__output' + inputcsv_extension
# define the processing function
def replace_string_in_columns(input_csv, output_csv, indexes_of_columns_to_process, string_to_be_replaced, string_to_replace_with):
number_of_replacements = 0
with open(input_csv, 'r', newline='') as infile, open(output_csv, 'w', newline='') as outfile:
reader = csv.reader(infile)
writer = csv.writer(outfile)
# writer = csv.writer(outfile, delimiter=mydelimiter, escapechar='\\', quoting=csv.QUOTE_NONE)
row_index=0
for row in reader:
for col_index in indexes_of_columns_to_process:
# break the processing when empty lines at the end of the file are reached
if len(row) == 0:
break
# get the single cell and analyze it
#-------------------------------------
list_of_cells = row[0].split(mydelimiter)
# WARNING: in case the inspecting cell contains the delimiter, the split will return more columns.
if len(list_of_cells) != number_of_columns:
raise DelimiterManagementError("In row {}: {}, the number of splitted cells is {}, but the number of columns (in header) is {}.".format(row_index, row, len(list_of_cells), number_of_columns))
cell = list_of_cells[col_index]
columns_before = list_of_cells[:col_index]
columns_after = list_of_cells[(col_index + 1):]
print("col_index: ", col_index)
print("row: ", row)
# print("list_of_cells: ", list_of_cells)
print("cell: ", cell)
# print("columns_before: ", columns_before)
# print("columns_after: ", columns_after)
if string_to_be_replaced in cell and row_index != 0:
# do the substitution in the cell
cell = cell.replace(string_to_be_replaced, string_to_replace_with)
number_of_replacements = number_of_replacements + 1
print("number_of_replacements: ", number_of_replacements)
# sew the row up agian
list_of_cells_replaced = columns_before + [ cell ] + columns_after
string_of_cells_replaced = mydelimiter.join(list_of_cells_replaced)
row_of_cells_replaced = [ string_of_cells_replaced ]
row = row_of_cells_replaced
print("substitutiion done: ", cell)
print("list_of_cells_replaced: ", list_of_cells_replaced)
print("string_of_cells_replaced: ", string_of_cells_replaced)
# write / copy the row in the new file
writer.writerow(row)
print("written row: ", row, "index: ", row_index)
row_index=row_index+1
return number_of_replacements
# launch the function
result['number_of_modified_cells'] = replace_string_in_columns(csv_file_path, csv_output_file_path, indexes_of_columns_to_process, string_to_be_replaced, string_to_replace_with)
# replace the old csv with the new one
shutil.copyfile(csv_output_file_path, csv_file_path)
os.remove(csv_output_file_path)
if result['number_of_modified_cells'] > 0:
result['changed'] = True
else:
result['changed'] = False
else:
result['changed'] = False
result['source_csv_number_of_raw_lines'] = NOL
result['source_csv_number_of_lines'] = NOL - 1
print("result:\n\n", result)
Everything got so much more complicated just because I called csv.reader
without giving the parameters delimiter
, quotechar
, escapechar
.
So I turned the lines
reader = csv.reader(infile)
writer = csv.writer(outfile)
into
reader = csv.reader(infile, quoting=csv.QUOTE_NONE, delimiter=mydelimiter, quotechar='',escapechar='\\')
writer = csv.writer(outfile, quoting=csv.QUOTE_NONE, delimiter=mydelimiter, quotechar='',escapechar='\\')
and then rewrote the further rows of the script anew.
Now it works and it is much less complicated, so that I don't have to define a custom error to be raised in case the text of any cell of the csv contains the delimiter.
Lesson learned: when using libraries, always read the documentation before starting writing the code.
result = {}
csv_file_path = 'myreport.csv'
columns_to_process = ['money1', 'money2']
string_to_be_replaced = "."
string_to_replace_with = ","
mydelimiter = ";"
#--------------------------
# specific import for csv
import csv, io
# import operator
import shutil
# specific import to manage errors
import os, traceback
# check file existence
if not os.path.isfile(csv_file_path):
raise IOError("csv_file_path is not valid or does not exists: {}".format(csv_file_path))
# check the delimiter existence
with open(csv_file_path, 'r') as csvfile:
first_line = csvfile.readline()
# print("first_line", first_line)
if mydelimiter not in first_line:
delimiter_warning_message = "No delimiter found in file first line."
result['warning_messages'].append(delimiter_warning_message)
# count the lines in the source file
NOL = sum(1 for _ in io.open(csv_file_path, "r"))
# print("NOL:", NOL)
# if NOL = 0 -> void file
# if NOL = 1 -> header-only file
if NOL > 0:
# just get the columns names, then close the file
#-----------------------------------------------------
with open(csv_file_path, 'r') as csvfile:
columnslist = csv.DictReader(csvfile, delimiter=mydelimiter)
list_of_dictcolumns = []
# loop to iterate through the rows of csv
for row in columnslist:
# adding the first row
list_of_dictcolumns.append(row)
# breaking the loop after the
# first iteration itself
break
# transform the colum names into a list
first_dictcolumn = list_of_dictcolumns[0]
list_of_column_names = list(first_dictcolumn.keys())
number_of_columns = len(list_of_column_names)
# check columns existence
#------------------------
column_existence = [ (column_name in list_of_column_names ) for column_name in columns_to_process ]
if not all(column_existence):
raise ValueError("File {} does not contains all the columns given in input for processing:\nFile columns names: {}\nInput columns names: {}".format(csv_file_path, list_of_column_names, columns_to_process))
# determine the indexes of the columns to process
indexes_of_columns_to_process = [i for i, column_name in enumerate(list_of_column_names) if column_name in columns_to_process]
print("indexes_of_columns_to_process: ", indexes_of_columns_to_process)
# build the path of a to-be-generated duplicate file to be used as output
inputcsv_absname, inputcsv_extension = os.path.splitext(csv_file_path)
csv_output_file_path = inputcsv_absname + '__output' + inputcsv_extension
# define the processing function
def replace_string_in_columns(input_csv, output_csv, indexes_of_columns_to_process, string_to_be_replaced, string_to_replace_with):
number_of_replacements = 0
with open(input_csv, 'r', newline='') as infile, open(output_csv, 'w', newline='') as outfile:
reader = csv.reader(infile, quoting=csv.QUOTE_NONE, delimiter=mydelimiter, quotechar='',escapechar='\\')
writer = csv.writer(outfile, quoting=csv.QUOTE_NONE, delimiter=mydelimiter, quotechar='',escapechar='\\')
# writer = csv.writer(outfile, delimiter=mydelimiter, escapechar='\\', quoting=csv.QUOTE_NONE)
row_index=0
for row in reader:
for col_index in indexes_of_columns_to_process:
# break the processing when empty lines at the end of the file are reached
if len(row) == 0:
break
cell = row[col_index]
columns_before = row[:col_index]
columns_after = row[(col_index + 1):]
print("col_index: ", col_index)
print("row: ", row)
# print("list_of_cells: ", list_of_cells)
print("cell: ", cell)
# print("columns_before: ", columns_before)
# print("columns_after: ", columns_after)
if string_to_be_replaced in cell and row_index != 0:
# do the substitution in the cell
cell = cell.replace(string_to_be_replaced, string_to_replace_with)
number_of_replacements = number_of_replacements + 1
print("number_of_replacements: ", number_of_replacements)
# # sew the row up agian
row_replaced = columns_before + [ cell ] + columns_after
row = row_replaced
# write / copy the row in the new file
writer.writerow(row)
print("written row: ", row, "index: ", row_index)
row_index=row_index+1
return number_of_replacements
# launch the function
result['number_of_modified_cells'] = replace_string_in_columns(csv_file_path, csv_output_file_path, indexes_of_columns_to_process, string_to_be_replaced, string_to_replace_with)
# replace the old csv with the new one
shutil.copyfile(csv_output_file_path, csv_file_path)
os.remove(csv_output_file_path)
if result['number_of_modified_cells'] > 0:
result['changed'] = True
else:
result['changed'] = False
else:
result['changed'] = False
result['source_csv_number_of_raw_lines'] = NOL
result['source_csv_number_of_lines'] = NOL - 1
print("result:\n\n", result)