Here are complete OOP solutions for the exercises related to Data Engineering
✅ 1. DataSource (Inheritance)
class DataSource: def __init__(self, name): self.name = name def read(self): print(f"Reading from {self.name}")class CSVSource(DataSource): def read(self): print(f"Reading CSV file from {self.name}")class JSONSource(DataSource): def read(self): print(f"Reading JSON file from {self.name}")# Testcsv = CSVSource("data.csv")csv.read()
✅ 2. DataTransformer
class DataTransformer: def transform(self, data): raise NotImplementedErrorclass UpperCaseTransformer(DataTransformer): def transform(self, data): return [x.upper() for x in data]class RemoveNullTransformer(DataTransformer): def transform(self, data): return [x for x in data if x is not None]# Testdata = ["hello", None, "world"]transformer = RemoveNullTransformer()print(transformer.transform(data))
✅ 3. DataWriter
class DataWriter: def write(self, data): raise NotImplementedErrorclass S3Writer(DataWriter): def write(self, data): print(f"Writing to S3: {data}")class LocalFileWriter(DataWriter): def write(self, data): print(f"Writing to local file: {data}")class DatabaseWriter(DataWriter): def write(self, data): print(f"Inserting into database: {data}")
✅ 4. ETL Pipeline
class ETLPipeline: def __init__(self, source, transformer, writer): self.source = source self.transformer = transformer self.writer = writer def run(self): data = self.source.read() transformed = self.transformer.transform(data) self.writer.write(transformed)
Example usage:
class DummySource: def read(self): return ["hello", None, "world"]pipeline = ETLPipeline( source=DummySource(), transformer=RemoveNullTransformer(), writer=LocalFileWriter())pipeline.run()
✅ 5. Schema Validator
class SchemaValidator: def __init__(self, schema): self.schema = schema def validate(self, record): for key, dtype in self.schema.items(): if key not in record: return False if not isinstance(record[key], dtype): return False return Trueschema = {"name": str, "age": int}validator = SchemaValidator(schema)print(validator.validate({"name": "Naim", "age": 30}))
✅ 6. Singleton Logger
class Logger: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(Logger, cls).__new__(cls) return cls._instance def log(self, message): print(f"[LOG]: {message}")# Testlogger1 = Logger()logger2 = Logger()print(logger1 is logger2) # True
✅ 7. Abstract Base Classes
from abc import ABC, abstractmethodclass AbstractSource(ABC): @abstractmethod def read(self): passclass AbstractTransformer(ABC): @abstractmethod def transform(self, data): passclass AbstractWriter(ABC): @abstractmethod def write(self, data): pass
✅ 8. Transformer Pipeline (Multiple Transformers)
class Pipeline: def __init__(self, transformers): self.transformers = transformers def run(self, data): for transformer in self.transformers: data = transformer.transform(data) return datapipeline = Pipeline([ RemoveNullTransformer(), UpperCaseTransformer()])print(pipeline.run(["hello", None, "world"]))
✅ 9. SCD Type 2 Processor (Simplified)
class SCDType2Processor: def detect_changes(self, old_data, new_data): changes = [] for new in new_data: for old in old_data: if new["id"] == old["id"] and new != old: changes.append(new) return changes def apply_scd(self, old_data, new_data): changes = self.detect_changes(old_data, new_data) return old_data + changesold = [{"id": 1, "name": "A"}]new = [{"id": 1, "name": "B"}]scd = SCDType2Processor()print(scd.apply_scd(old, new))
✅ 10. Kafka Consumer (OOP)
class KafkaConsumer: def connect(self): print("Connecting to Kafka") def consume(self): print("Consuming messages") return ["msg1", "msg2"] def process(self): messages = self.consume() for msg in messages: print(f"Processing {msg}")class JSONKafkaConsumer(KafkaConsumer): def process(self): messages = self.consume() for msg in messages: print(f"Processing JSON {msg}")
🔥 Final Real Framework Example
class Pipeline: def __init__(self, source, transformers, writer): self.source = source self.transformers = transformers self.writer = writer self.logger = Logger() def run(self): self.logger.log("Pipeline started") data = self.source.read() for transformer in self.transformers: data = transformer.transform(data) self.writer.write(data) self.logger.log("Pipeline finished")
Usage:
class DummySource: def read(self): return ["hello", None, "data"]pipeline = Pipeline( source=DummySource(), transformers=[RemoveNullTransformer(), UpperCaseTransformer()], writer=LocalFileWriter())pipeline.run()
