Skip to content

Commit e8f41fa

Browse files
Arpan-KreetiArp-G
authored andcommitted
refactor and cleanup codebase
1 parent 5db2868 commit e8f41fa

File tree

18 files changed

+192
-297
lines changed

18 files changed

+192
-297
lines changed

apps/csv2sql/lib/csv2sql.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
defmodule Csv2sql do
22
def main(args) do
3-
# Csv2sql.Helpers.greet()
3+
Csv2sql.Helpers.greet()
44
# Load configuration varaibles dynamically for escripts, this is required
55
# since configuration variables are set to whatever they where when the
66
# escript was build and cannot be changed later

apps/csv2sql/lib/csv2sql/application.ex

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,22 @@ defmodule Csv2sql.Application do
22
use Application
33

44
def start(_type, _args) do
5-
set_validate = Application.get_env(:csv2sql, Csv2sql.MainServer)[:set_validate]
6-
set_insert_schema = Application.get_env(:csv2sql, Csv2sql.Worker)[:set_insert_schema]
7-
set_insert_data = Application.get_env(:csv2sql, Csv2sql.Worker)[:set_insert_data]
8-
95
repo_supervisor =
10-
if set_validate || set_insert_schema || set_insert_data do
11-
[Csv2sql.Repo]
12-
else
13-
[]
14-
end
6+
if Application.get_env(:csv2sql, Csv2sql.MainServer)[:set_validate] ||
7+
Application.get_env(:csv2sql, Csv2sql.Worker)[:set_insert_schema] ||
8+
Application.get_env(:csv2sql, Csv2sql.Worker)[:set_insert_data],
9+
do: [Csv2sql.Repo],
10+
else: []
1511

1612
children =
17-
[]
18-
|> Kernel.++(repo_supervisor)
19-
|> Kernel.++([
20-
Csv2sql.Observer,
21-
Csv2sql.JobQueueServer,
22-
Csv2sql.DbWorkerSupervisor,
23-
Csv2sql.WorkerSupervisor,
24-
Csv2sql.MainServer
25-
])
13+
repo_supervisor ++
14+
[
15+
Csv2sql.Observer,
16+
Csv2sql.JobQueueServer,
17+
Csv2sql.DbWorkerSupervisor,
18+
Csv2sql.WorkerSupervisor,
19+
Csv2sql.MainServer
20+
]
2621

2722
opts = [strategy: :one_for_one, name: Csv2sql.Supervisor]
2823
Supervisor.start_link(children, opts)

apps/csv2sql/lib/csv2sql/data_transfer.ex

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
defmodule Csv2sql.DataTransfer do
22
alias NimbleCSV.RFC4180, as: CSV
3+
alias Csv2sql.{JobQueueServer, Helpers}
34

5+
@doc """
6+
Divides a csv file in chunks and place them in a job queue.
7+
Whenever a DB worker is free it will pick up a chunk from the queue
8+
and insert it in the database.
9+
"""
410
def process_file(file) do
5-
Csv2sql.Helpers.print_msg("Begin data tranfer for file: " <> Path.basename(file))
11+
Helpers.print_msg("Begin data tranfer for file: " <> Path.basename(file))
612

713
insertion_chunk_size = Application.get_env(:csv2sql, Csv2sql.Repo)[:insertion_chunk_size]
814

@@ -17,28 +23,35 @@ defmodule Csv2sql.DataTransfer do
1723
wait_for_file_transfer(file)
1824
end
1925

26+
27+
# Wait until all chunks for the current file in the job queue has been processed
28+
# `:timer.sleep(300)` waits for the last chunk in queue to get inserted that is
29+
# if no, chunks were present on the job queue this means a DB worker has picked
30+
# up the chunk for insertion, so we wait for 300ms for the chunk to get inserted.
2031
defp wait_for_file_transfer(file) do
2132
if Csv2sql.JobQueueServer.job_for_file_present(file) do
2233
wait_for_file_transfer(file)
2334
else
2435
imported_csv_directory =
2536
Application.get_env(:csv2sql, Csv2sql.MainServer)[:imported_csv_directory]
2637

27-
# wait for the last chunk in queue to get inserted
2838
:timer.sleep(300)
29-
File.rename!(file, "#{imported_csv_directory}/#{Path.basename(file)}")
30-
Csv2sql.Helpers.print_msg("Finished processing file: " <> Path.basename(file), :green)
39+
File.rename(file, "#{imported_csv_directory}/#{Path.basename(file)}")
40+
Helpers.print_msg("Finished processing file: " <> Path.basename(file), :green)
3141
end
3242
end
3343

44+
45+
# Wait until job queue has space for the next chunk
46+
# by recursively calling itself.
3447
defp check_job_queue(file, data_chunk) do
3548
job_count_limit = Application.get_env(:csv2sql, Csv2sql.Repo)[:job_count_limit]
36-
job_count = Csv2sql.JobQueueServer.get_job_count()
49+
job_count = JobQueueServer.get_job_count()
3750

3851
if job_count > job_count_limit do
3952
check_job_queue(file, data_chunk)
4053
else
41-
Csv2sql.JobQueueServer.add_data_chunk(file, data_chunk)
54+
JobQueueServer.add_data_chunk(file, data_chunk)
4255
end
4356
end
4457
end
Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,26 @@
11
defmodule Csv2sql.Database do
22
alias NimbleCSV.RFC4180, as: CSV
3-
alias Csv2sql.Repo
3+
alias Csv2sql.{Repo, Helpers, Observer, ErrorTracker}
44

5+
@doc """
6+
Creates the table for a csv file
7+
"""
58
def make_db_schema([drop_query, create_query]) do
6-
database_name = Application.get_env(:csv2sql, Csv2sql.Repo)[:database_name]
7-
8-
table_name =
9-
String.trim_leading(drop_query, "DROP TABLE IF EXISTS #{database_name}.")
10-
|> String.trim_trailing(";")
11-
129
try do
1310
execute_query(drop_query)
1411
execute_query(create_query)
1512
catch
1613
_, reason ->
17-
Csv2sql.ErrorTracker.add_error(reason)
14+
ErrorTracker.add_error(reason)
1815
end
1916

20-
Csv2sql.Helpers.print_msg("Create Schema for: #{table_name}")
21-
end
22-
23-
defp execute_query(query) do
24-
Ecto.Adapters.SQL.query!(Repo, query, [])
17+
log_table_created(drop_query)
2518
end
2619

20+
@doc """
21+
Inserts a chunk of data in the database
22+
"""
2723
def insert_data_chunk(file, data_chunk) do
28-
database_name = Application.get_env(:csv2sql, Csv2sql.Repo)[:database_name]
29-
3024
table_name =
3125
file
3226
|> Path.basename()
@@ -46,31 +40,34 @@ defmodule Csv2sql.Database do
4640
end)
4741

4842
try do
49-
Repo.insert_all(table_name, data_chunk, prefix: database_name)
43+
Repo.insert_all(table_name, data_chunk,
44+
prefix: Application.get_env(:csv2sql, Csv2sql.Repo)[:database_name]
45+
)
5046
catch
5147
_, reason ->
52-
Csv2sql.ErrorTracker.add_error(reason)
48+
ErrorTracker.add_error(reason)
5349
end
5450

55-
Csv2sql.Observer.update_file_status(file, :insert_data)
51+
Observer.update_file_status(file, :insert_data)
5652
end
5753

54+
@doc """
55+
Prepares Database for data insertion by creating the Database if not exists
56+
"""
5857
def prepare_db() do
5958
database_name = Application.get_env(:csv2sql, Csv2sql.Repo)[:database_name]
6059

61-
Ecto.Adapters.SQL.query!(
62-
Repo,
63-
"CREATE DATABASE IF NOT EXISTS #{database_name} CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;",
64-
[]
60+
execute_query(
61+
"CREATE DATABASE IF NOT EXISTS #{database_name} CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;"
6562
)
6663

67-
Ecto.Adapters.SQL.query!(
68-
Repo,
69-
"SET GLOBAL SQL_MODE=\"NO_BACKSLASH_ESCAPES,NO_ENGINE_SUBSTITUTION,NO_ZERO_IN_DATE\";",
70-
[]
64+
execute_query(
65+
"SET GLOBAL SQL_MODE=\"NO_BACKSLASH_ESCAPES,NO_ENGINE_SUBSTITUTION,NO_ZERO_IN_DATE\";"
7166
)
7267
end
7368

69+
defp execute_query(query), do: Ecto.Adapters.SQL.query!(Repo, query, [])
70+
7471
defp get_headers(file) do
7572
[headers] =
7673
file
@@ -81,4 +78,17 @@ defmodule Csv2sql.Database do
8178

8279
headers
8380
end
81+
82+
# Logs the table that was created
83+
# Gets the table name from the DROP query
84+
defp log_table_created(drop_query) do
85+
table_name =
86+
drop_query
87+
|> String.trim_leading(
88+
"DROP TABLE IF EXISTS #{Application.get_env(:csv2sql, Csv2sql.Repo)[:database_name]}."
89+
)
90+
|> String.trim_trailing(";")
91+
92+
Helpers.print_msg("Create Schema for: #{table_name}")
93+
end
8494
end

apps/csv2sql/lib/csv2sql/db_worker.ex

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
defmodule Csv2sql.DbWorker do
22
use GenServer
3-
4-
alias Csv2sql.JobQueueServer, as: FS
3+
alias Csv2sql.{JobQueueServer, Database}
54

65
def start_link(_) do
76
GenServer.start_link(__MODULE__, :no_args)
@@ -12,11 +11,14 @@ defmodule Csv2sql.DbWorker do
1211
{:ok, nil}
1312
end
1413

14+
@doc """
15+
Recursively requests the job queue for work(chunks of data)
16+
"""
1517
def handle_info(:start_new_db_work, _) do
16-
FS.get_work()
18+
JobQueueServer.get_work()
1719
|> case do
1820
{file, data_chunk} ->
19-
Csv2sql.Database.insert_data_chunk(file, data_chunk)
21+
Database.insert_data_chunk(file, data_chunk)
2022

2123
:no_work ->
2224
nil

apps/csv2sql/lib/csv2sql/error_tracker.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule Csv2sql.ErrorTracker do
22
use GenServer
3+
alias Csv2sql.{Observer, Helpers}
34

45
def register_supervisor(sup_pid) do
56
GenServer.cast(:error_tracker, {:register_supervisor, sup_pid})
@@ -11,9 +12,9 @@ defmodule Csv2sql.ErrorTracker do
1112
1213
#{inspect(error)}
1314
"""
14-
|> Csv2sql.Helpers.print_msg(:red)
15+
|> Helpers.print_msg(:red)
1516

16-
Csv2sql.Observer.change_stage(:error)
17+
Observer.change_stage(:error)
1718

1819
# Call genserver not cast since, we need the wait synchronously untill supervisor is stopped
1920
GenServer.call(:error_tracker, {:add_error, error})

apps/csv2sql/lib/csv2sql/helpers.ex

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,5 @@ defmodule Csv2sql.Helpers do
5858
} by Arpan.
5959
""" <> reset())
6060
|> IO.puts()
61-
62-
start_text = [:yellow_background, :black, " Starting... !"] |> format()
63-
64-
CliSpinners.spin_fun(
65-
[
66-
frames: :arrow3,
67-
text: start_text,
68-
done: ""
69-
],
70-
fn -> :timer.sleep(5000) end
71-
)
7261
end
7362
end

apps/csv2sql/lib/csv2sql/import_validator.ex

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ defmodule Csv2sql.ImportValidator do
22
alias NimbleCSV.RFC4180, as: CSV
33
alias Csv2sql.Helpers
44

5+
@doc """
6+
Validates import by comparing row count in csv vs row count in database
7+
"""
58
def validate_import(file_list) do
69
%{stats: {total, correct, incorrect}, incorrect_files: incorrect_files} =
710
file_list
@@ -11,19 +14,18 @@ defmodule Csv2sql.ImportValidator do
1114
%{stats: {total, correct, incorrect}, incorrect_files: incorrect_files} ->
1215
Helpers.print_msg("Checking File: #{Path.basename(file)}", :yellow)
1316

14-
validated_csv_directory =
15-
Application.get_env(:csv2sql, Csv2sql.MainServer)[:validated_csv_directory]
17+
main_server_config = Application.get_env(:csv2sql, Csv2sql.MainServer)
1618

1719
src =
1820
if Application.get_env(:csv2sql, Csv2sql.Worker)[:set_insert_data],
19-
do: Application.get_env(:csv2sql, Csv2sql.MainServer)[:imported_csv_directory],
20-
else: Application.get_env(:csv2sql, Csv2sql.MainServer)[:source_csv_directory]
21+
do: main_server_config[:imported_csv_directory],
22+
else: main_server_config[:source_csv_directory]
2123

2224
result =
2325
if validate_csv(file, row_count) do
2426
File.rename(
2527
"#{src}/#{file}.csv",
26-
"#{validated_csv_directory}/#{file}.csv"
28+
"#{main_server_config[:validated_csv_directory]}/#{file}.csv"
2729
)
2830

2931
%{
@@ -41,36 +43,13 @@ defmodule Csv2sql.ImportValidator do
4143
end
4244
)
4345

44-
"""
45-
46-
#{IO.ANSI.underline()}Validation Completed:#{IO.ANSI.no_underline()}
47-
48-
"""
49-
|> Helpers.print_msg(:white)
50-
51-
white = IO.ANSI.white()
52-
53-
Helpers.print_msg("* Number of files checked: #{white}#{total}")
54-
Helpers.print_msg("* Number of files with correct count: #{white}#{correct}")
55-
Helpers.print_msg("* Number of files with incorrect count: #{white}#{incorrect}")
56-
57-
if incorrect > 0 do
58-
"""
59-
60-
61-
Files with incorrect count:
62-
"""
63-
|> Helpers.print_msg(:white)
64-
65-
incorrect_files
66-
|> Enum.each(fn file ->
67-
Helpers.print_msg("* #{Path.basename(file)}", :red)
68-
end)
69-
end
70-
46+
show_validation_results(total, correct, incorrect, incorrect_files)
7147
{incorrect, incorrect_files}
7248
end
7349

50+
@doc """
51+
Get row count in csv file
52+
"""
7453
def get_count_from_csv(file) do
7554
file
7655
|> File.stream!()
@@ -125,4 +104,33 @@ defmodule Csv2sql.ImportValidator do
125104
-1
126105
end
127106
end
107+
108+
defp show_validation_results(total, correct, incorrect, incorrect_files) do
109+
"""
110+
111+
#{IO.ANSI.underline()}Validation Completed:#{IO.ANSI.no_underline()}
112+
113+
"""
114+
|> Helpers.print_msg(:white)
115+
116+
white = IO.ANSI.white()
117+
118+
Helpers.print_msg("* Number of files checked: #{white}#{total}")
119+
Helpers.print_msg("* Number of files with correct count: #{white}#{correct}")
120+
Helpers.print_msg("* Number of files with incorrect count: #{white}#{incorrect}")
121+
122+
if incorrect > 0 do
123+
"""
124+
125+
126+
Files with incorrect count:
127+
"""
128+
|> Helpers.print_msg(:white)
129+
130+
incorrect_files
131+
|> Enum.each(fn file ->
132+
Helpers.print_msg("* #{Path.basename(file)}", :red)
133+
end)
134+
end
135+
end
128136
end

apps/csv2sql/lib/csv2sql/job_queue_server.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ defmodule Csv2sql.JobQueueServer do
4949
end
5050

5151
def handle_call({:job_for_file_present, file}, _from, state) do
52-
file_present? =
52+
file_present =
5353
Enum.any?(state, fn {file_job, _data_chunk} ->
5454
file == file_job
5555
end)
5656

57-
{:reply, file_present?, state}
57+
{:reply, file_present, state}
5858
end
5959
end

0 commit comments

Comments
 (0)