complete OOP solutions for Data Engineering Exercices

Here are complete OOP solutions for the exercises related to Data Engineering


✅ 1. DataSource (Inheritance)

class DataSource:
def __init__(self, name):
self.name = name
def read(self):
print(f"Reading from {self.name}")
class CSVSource(DataSource):
def read(self):
print(f"Reading CSV file from {self.name}")
class JSONSource(DataSource):
def read(self):
print(f"Reading JSON file from {self.name}")
# Test
csv = CSVSource("data.csv")
csv.read()

✅ 2. DataTransformer

class DataTransformer:
def transform(self, data):
raise NotImplementedError
class UpperCaseTransformer(DataTransformer):
def transform(self, data):
return [x.upper() for x in data]
class RemoveNullTransformer(DataTransformer):
def transform(self, data):
return [x for x in data if x is not None]
# Test
data = ["hello", None, "world"]
transformer = RemoveNullTransformer()
print(transformer.transform(data))

✅ 3. DataWriter

class DataWriter:
def write(self, data):
raise NotImplementedError
class S3Writer(DataWriter):
def write(self, data):
print(f"Writing to S3: {data}")
class LocalFileWriter(DataWriter):
def write(self, data):
print(f"Writing to local file: {data}")
class DatabaseWriter(DataWriter):
def write(self, data):
print(f"Inserting into database: {data}")

✅ 4. ETL Pipeline

class ETLPipeline:
def __init__(self, source, transformer, writer):
self.source = source
self.transformer = transformer
self.writer = writer
def run(self):
data = self.source.read()
transformed = self.transformer.transform(data)
self.writer.write(transformed)

Example usage:

class DummySource:
def read(self):
return ["hello", None, "world"]
pipeline = ETLPipeline(
source=DummySource(),
transformer=RemoveNullTransformer(),
writer=LocalFileWriter()
)
pipeline.run()

✅ 5. Schema Validator

class SchemaValidator:
def __init__(self, schema):
self.schema = schema
def validate(self, record):
for key, dtype in self.schema.items():
if key not in record:
return False
if not isinstance(record[key], dtype):
return False
return True
schema = {"name": str, "age": int}
validator = SchemaValidator(schema)
print(validator.validate({"name": "Naim", "age": 30}))

✅ 6. Singleton Logger

class Logger:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Logger, cls).__new__(cls)
return cls._instance
def log(self, message):
print(f"[LOG]: {message}")
# Test
logger1 = Logger()
logger2 = Logger()
print(logger1 is logger2) # True

✅ 7. Abstract Base Classes

from abc import ABC, abstractmethod
class AbstractSource(ABC):
@abstractmethod
def read(self):
pass
class AbstractTransformer(ABC):
@abstractmethod
def transform(self, data):
pass
class AbstractWriter(ABC):
@abstractmethod
def write(self, data):
pass

✅ 8. Transformer Pipeline (Multiple Transformers)

class Pipeline:
def __init__(self, transformers):
self.transformers = transformers
def run(self, data):
for transformer in self.transformers:
data = transformer.transform(data)
return data
pipeline = Pipeline([
RemoveNullTransformer(),
UpperCaseTransformer()
])
print(pipeline.run(["hello", None, "world"]))

✅ 9. SCD Type 2 Processor (Simplified)

class SCDType2Processor:
def detect_changes(self, old_data, new_data):
changes = []
for new in new_data:
for old in old_data:
if new["id"] == old["id"] and new != old:
changes.append(new)
return changes
def apply_scd(self, old_data, new_data):
changes = self.detect_changes(old_data, new_data)
return old_data + changes
old = [{"id": 1, "name": "A"}]
new = [{"id": 1, "name": "B"}]
scd = SCDType2Processor()
print(scd.apply_scd(old, new))

✅ 10. Kafka Consumer (OOP)

class KafkaConsumer:
def connect(self):
print("Connecting to Kafka")
def consume(self):
print("Consuming messages")
return ["msg1", "msg2"]
def process(self):
messages = self.consume()
for msg in messages:
print(f"Processing {msg}")
class JSONKafkaConsumer(KafkaConsumer):
def process(self):
messages = self.consume()
for msg in messages:
print(f"Processing JSON {msg}")

🔥 Final Real Framework Example

class Pipeline:
def __init__(self, source, transformers, writer):
self.source = source
self.transformers = transformers
self.writer = writer
self.logger = Logger()
def run(self):
self.logger.log("Pipeline started")
data = self.source.read()
for transformer in self.transformers:
data = transformer.transform(data)
self.writer.write(data)
self.logger.log("Pipeline finished")

Usage:

class DummySource:
def read(self):
return ["hello", None, "data"]
pipeline = Pipeline(
source=DummySource(),
transformers=[RemoveNullTransformer(), UpperCaseTransformer()],
writer=LocalFileWriter()
)
pipeline.run()

أضف تعليق