DEV Community

Andrés Baamonde Lozano
Andrés Baamonde Lozano

Posted on

Creating a json repository

Intro

A few days ago , i start refactoring and reimplementing a tiny json repository that i create at work, the main reason was provide a fast implementation without database and store a few objects.

I think that the implementation could be improvable, but it works, and for a first aproach, it was valid, on the next months i will continue working on it.

The implementation is quite simple, there is a repository that uses a context to store data, there is a example of use at the end of the post and on following posts i will detail another ways of use.

Context

import os import json import uuid import threading from ..errors.entity_not_found import EntityNotFound class JsonContext(object): def __init__(self, enity_name, filepath="./", key_field="id", key_generate_func=lambda: str(uuid.uuid4())): self.file_path = os.path.join(filepath, "{0}s.json".format(enity_name)) self.entity_name = enity_name self.session_values = [] if not os.path.exists(self.file_path): self.commit() self.key_field = key_field self.key_generate_func = key_generate_func self.lock = threading.Lock() def open(self): self.lock.acquire() with open(self.file_path) as f: data = json.load(f) self.session_values = data["values"] def close(self): self.lock.release() def __enter__(self): self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def exists(self, identifier): matches = list(filter( lambda x: x[self.key_field] == identifier, self.session_values)) return len(matches) > 0 def add(self, entity): if (self.key_field in entity and self.exists(entity[self.key_field])): self.delete(entity) else: entity[self.key_field] = self.key_generate_func() self.session_values.append(entity) return entity def delete(self, entity): self.session_values = list(filter( lambda x: x[self.key_field] != entity[self.key_field], self.session_values)) def commit(self): with open(self.file_path, 'w+') as f: f.write(json.dumps({"name": self.entity_name, "values" : self.session_values})) def find(self, query_function): return list(filter(query_function, self.session_values)) def get(self, identifier): if not self.exists(identifier): raise EntityNotFound("identifier not found") return self.find(lambda x: x[self.key_field] == identifier)[0] def get_all(self): return self.session_values 
Enter fullscreen mode Exit fullscreen mode

Repository

from ..context.json_context import JsonContext class BaseJsonRepository(object): def __init__(self, entity_name, filepath="./"): self.context = JsonContext(entity_name, filepath=filepath) def __enter__(self): self.context.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.context.close() def insert(self, entity): return self.context.add(entity) def update(self, entity): return self.context.add(entity) def delete(self, entity): self.context.delete(entity) def get_all(self): return self.context.get_all() def get(self, identifier): return self.context.get(identifier) def find(self, function): return self.context.find(function) 
Enter fullscreen mode Exit fullscreen mode

Example

Examples... take a look to tests :)

from json_repository.repositories.base_json_repository import BaseJsonRepository from json_repository.errors.entity_not_found import EntityNotFound class FoobarRepository(BaseJsonRepository): def __init__(self): super(FoobarRepository, self).__init__("foo") with FoobarRepository() as repo: value = repo.insert({ "foo": "a foo value", "bar":" a bar value" }) repo.context.commit() 
Enter fullscreen mode Exit fullscreen mode

Links

Github
Pypi

Any feedback about this 'toy' is welcomed, leave a comment below :).

Top comments (0)