Skip to main content

Implementation Patterns

This document provides concrete code examples for implementing i3X server endpoints.

Pattern 1: Basic Object Endpoint Implementation

from flask import Flask, jsonify, request
from typing import List, Dict, Optional
import datetime

app = Flask(__name__)

class ObjectRepository:
"""Your internal data access layer"""

def get_object(self, element_id: str) -> Optional[Dict]:
"""Retrieve object from your data store"""
# Implementation specific to your platform
pass

def list_objects(
self,
type_id: Optional[str] = None,
include_metadata: bool = False
) -> List[Dict]:
"""List objects with optional type filtering"""
pass

def get_objects_by_ids(self, element_ids: List[str]) -> List[Dict]:
"""Retrieve multiple objects by their elementIds"""
pass

def get_related_objects(
self,
element_id: str,
relationship_type: Optional[str] = None
) -> List[Dict]:
"""Get objects related to a given object"""
pass

repo = ObjectRepository()

@app.route('/objects', methods=['GET'])
def list_objects():
"""List all objects with optional filtering"""
try:
# Extract query parameters
type_id = request.args.get('typeId')
include_metadata = request.args.get('includeMetadata', 'false').lower() == 'true'

# Retrieve objects from your data store
objects = repo.list_objects(type_id=type_id, include_metadata=include_metadata)

return jsonify(objects), 200

except ValueError as e:
return jsonify({'detail': [{'msg': str(e), 'type': 'value_error'}]}), 400
except Exception as e:
return jsonify({'detail': [{'msg': 'Internal server error', 'type': 'server_error'}]}), 500

@app.route('/objects/list', methods=['POST'])
def get_objects_by_ids():
"""Get objects by elementId(s)"""
try:
data = request.get_json()
element_id = data.get('elementId')
element_ids = data.get('elementIds', [])

if element_id:
element_ids = [element_id]

if not element_ids:
return jsonify({'detail': [{'loc': ['body'], 'msg': 'elementId or elementIds required', 'type': 'value_error'}]}), 422

objects = repo.get_objects_by_ids(element_ids)
return jsonify(objects), 200

except Exception as e:
return jsonify({'detail': [{'msg': 'Internal server error', 'type': 'server_error'}]}), 500

@app.route('/objects/related', methods=['POST'])
def get_related_objects():
"""Get related objects for one or more elementIds"""
try:
data = request.get_json()
element_id = data.get('elementId')
element_ids = data.get('elementIds', [])
relationship_type = data.get('relationshiptype')
include_metadata = data.get('includeMetadata', False)

if element_id:
element_ids = [element_id]

results = []
for eid in element_ids:
related = repo.get_related_objects(eid, relationship_type)
results.extend(related)

return jsonify(results), 200

except Exception as e:
return jsonify({'detail': [{'msg': 'Internal server error', 'type': 'server_error'}]}), 500

Pattern 2: Value and History Data Access

from datetime import datetime, timedelta
from typing import Optional, List

class DataRepository:
"""Your internal time-series data access layer"""

def get_object_value(
self,
element_ids: List[str],
max_depth: int = 1
) -> List[Dict]:
"""Retrieve current values for objects"""
# Your implementation here
pass

def get_object_history(
self,
element_ids: List[str],
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
max_depth: int = 1
) -> List[Dict]:
"""Retrieve historical values from your historian/database"""
pass

def update_object_value(
self,
element_id: str,
value: any
) -> bool:
"""Update an object's current value"""
pass

def update_object_history(
self,
element_id: str,
values: List[Dict]
) -> bool:
"""Update historical values for an object"""
pass

data_repo = DataRepository()

@app.route('/objects/value', methods=['POST'])
def get_object_value():
"""Get current values for one or more objects"""
try:
data = request.get_json()
element_id = data.get('elementId')
element_ids = data.get('elementIds', [])
max_depth = data.get('maxDepth', 1)

if element_id:
element_ids = [element_id]

if not element_ids:
return jsonify({'detail': [{'loc': ['body'], 'msg': 'elementId or elementIds required', 'type': 'value_error'}]}), 422

# Retrieve current values
values = data_repo.get_object_value(
element_ids=element_ids,
max_depth=max_depth
)

return jsonify(values), 200

except ValueError as e:
return jsonify({'detail': [{'msg': str(e), 'type': 'value_error'}]}), 400
except Exception as e:
app.logger.error(f"Error retrieving values: {str(e)}")
return jsonify({'detail': [{'msg': 'Internal server error', 'type': 'server_error'}]}), 500

@app.route('/objects/history', methods=['POST'])
def get_object_history():
"""Get historical values for one or more objects"""
try:
data = request.get_json()
element_id = data.get('elementId')
element_ids = data.get('elementIds', [])
start_time_str = data.get('startTime')
end_time_str = data.get('endTime')
max_depth = data.get('maxDepth', 1)

if element_id:
element_ids = [element_id]

if not element_ids:
return jsonify({'detail': [{'loc': ['body'], 'msg': 'elementId or elementIds required', 'type': 'value_error'}]}), 422

# Parse time range
start_time = None
end_time = None
if start_time_str:
start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00'))
if end_time_str:
end_time = datetime.fromisoformat(end_time_str.replace('Z', '+00:00'))

# Retrieve historical values
history = data_repo.get_object_history(
element_ids=element_ids,
start_time=start_time,
end_time=end_time,
max_depth=max_depth
)

return jsonify(history), 200

except ValueError as e:
return jsonify({'detail': [{'msg': str(e), 'type': 'value_error'}]}), 400
except Exception as e:
app.logger.error(f"Error retrieving history: {str(e)}")
return jsonify({'detail': [{'msg': 'Internal server error', 'type': 'server_error'}]}), 500

@app.route('/objects/<element_id>/value', methods=['PUT'])
def update_object_value(element_id: str):
"""Update an object's current value"""
try:
data = request.get_json()
value = data.get('value')

success = data_repo.update_object_value(element_id, value)

if success:
return jsonify({'status': 'success'}), 200
else:
return jsonify({'detail': [{'msg': 'Failed to update value', 'type': 'server_error'}]}), 500

except Exception as e:
app.logger.error(f"Error updating value: {str(e)}")
return jsonify({'detail': [{'msg': 'Internal server error', 'type': 'server_error'}]}), 500

@app.route('/objects/<element_id>/history', methods=['PUT'])
def update_object_history(element_id: str):
"""Update historical values for an object"""
try:
data = request.get_json()
values = data.get('values', [])

success = data_repo.update_object_history(element_id, values)

if success:
return jsonify({'status': 'success', 'updated': len(values)}), 200
else:
return jsonify({'detail': [{'msg': 'Failed to update history', 'type': 'server_error'}]}), 500

except Exception as e:
app.logger.error(f"Error updating history: {str(e)}")
return jsonify({'detail': [{'msg': 'Internal server error', 'type': 'server_error'}]}), 500

Pattern 3: Authentication Middleware

from functools import wraps
from flask import request, jsonify
import jwt

class AuthService:
"""Your authentication service"""

def __init__(self, secret_key: str):
self.secret_key = secret_key

def validate_token(self, token: str) -> Optional[Dict]:
"""Validate JWT token and return user info"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None

def check_permission(self, user: Dict, resource: str, action: str) -> bool:
"""Check if user has permission for action on resource"""
# Your permission logic here
pass

auth_service = AuthService('your-secret-key')

def require_auth(f):
"""Decorator to require authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')

if not auth_header:
return jsonify({'error': 'Authorization header required'}), 401

# Extract token
parts = auth_header.split()
if len(parts) != 2 or parts[0].lower() != 'bearer':
return jsonify({'error': 'Invalid authorization header'}), 401

token = parts[1]

# Validate token
user = auth_service.validate_token(token)
if not user:
return jsonify({'error': 'Invalid or expired token'}), 401

# Add user to request context
request.current_user = user

return f(*args, **kwargs)

return decorated_function

def require_permission(resource: str, action: str):
"""Decorator to require specific permission"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user = getattr(request, 'current_user', None)

if not user:
return jsonify({'error': 'Unauthorized'}), 401

if not auth_service.check_permission(user, resource, action):
return jsonify({'error': 'Forbidden'}), 403

return f(*args, **kwargs)

return decorated_function
return decorator

# Usage example
@app.route('/objects', methods=['GET'])
@require_auth
@require_permission('objects', 'read')
def list_objects_protected():
# Your implementation
pass

Pattern 4: Rate Limiting

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["1000 per hour", "100 per minute"],
storage_uri="redis://localhost:6379"
)

@app.route('/objects/value', methods=['POST'])
@limiter.limit("60 per minute")
@require_auth
def get_object_value_rate_limited():
# Your implementation
pass

Pattern 5: Error Handling Middleware

from werkzeug.exceptions import HTTPException

@app.errorhandler(HTTPException)
def handle_http_exception(e):
"""Handle HTTP exceptions"""
response = {
'error': e.name,
'message': e.description,
'code': e.code,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'path': request.path
}
return jsonify(response), e.code

@app.errorhandler(Exception)
def handle_generic_exception(e):
"""Handle unexpected exceptions"""
app.logger.error(f"Unhandled exception: {str(e)}", exc_info=True)
response = {
'error': 'Internal Server Error',
'message': 'An unexpected error occurred',
'timestamp': datetime.utcnow().isoformat() + 'Z',
'path': request.path
}
return jsonify(response), 500

Pattern 6: Request Validation

from marshmallow import Schema, fields, validate, ValidationError

class GetObjectValueSchema(Schema):
"""Schema for getting object values"""
elementId = fields.Str()
elementIds = fields.List(fields.Str())
maxDepth = fields.Int(validate=validate.Range(min=0), load_default=1)

class GetObjectHistorySchema(Schema):
"""Schema for getting object history"""
elementId = fields.Str()
elementIds = fields.List(fields.Str())
startTime = fields.Str() # RFC 3339
endTime = fields.Str() # RFC 3339
maxDepth = fields.Int(validate=validate.Range(min=0), load_default=1)

get_value_schema = GetObjectValueSchema()

@app.route('/objects/value', methods=['POST'])
@require_auth
@require_permission('objects', 'read')
def get_object_value_validated():
"""Get object values with validation"""
try:
# Validate request data
validated_data = get_value_schema.load(request.json)

element_ids = validated_data.get('elementIds', [])
if validated_data.get('elementId'):
element_ids = [validated_data['elementId']]

if not element_ids:
return jsonify({
'detail': [{'loc': ['body'], 'msg': 'elementId or elementIds required', 'type': 'value_error'}]
}), 422

# Get values
values = data_repo.get_object_value(element_ids, validated_data.get('maxDepth', 1))

return jsonify(values), 200

except ValidationError as e:
return jsonify({
'detail': [{'loc': list(k), 'msg': v[0], 'type': 'validation_error'} for k, v in e.messages.items()]
}), 422

Pattern 7: Namespace and ObjectType Endpoints

class TypeRepository:
"""Your internal type/schema access layer"""

def list_namespaces(self) -> List[Dict]:
"""List all namespaces"""
pass

def list_object_types(self, namespace_uri: Optional[str] = None) -> List[Dict]:
"""List object types, optionally filtered by namespace"""
pass

def get_object_types_by_ids(self, element_ids: List[str]) -> List[Dict]:
"""Get object types by elementId(s)"""
pass

def list_relationship_types(self, namespace_uri: Optional[str] = None) -> List[Dict]:
"""List relationship types"""
pass

type_repo = TypeRepository()

@app.route('/namespaces', methods=['GET'])
@require_auth
def list_namespaces():
"""List all available namespaces"""
namespaces = type_repo.list_namespaces()
return jsonify(namespaces), 200

@app.route('/objecttypes', methods=['GET'])
@require_auth
def list_object_types():
"""List all object type schemas"""
namespace_uri = request.args.get('namespaceUri')
types = type_repo.list_object_types(namespace_uri)
return jsonify(types), 200

@app.route('/objecttypes/query', methods=['POST'])
@require_auth
def query_object_types():
"""Get object types by elementId(s)"""
data = request.get_json()
element_id = data.get('elementId')
element_ids = data.get('elementIds', [])

if element_id:
element_ids = [element_id]

types = type_repo.get_object_types_by_ids(element_ids)
return jsonify(types), 200

@app.route('/relationshiptypes', methods=['GET'])
@require_auth
def list_relationship_types():
"""List all relationship types"""
namespace_uri = request.args.get('namespaceUri')
rel_types = type_repo.list_relationship_types(namespace_uri)
return jsonify(rel_types), 200

except Exception as e:
return jsonify({'error': 'Batch operation failed', 'message': str(e)}), 500

Pattern 8: Subscription Endpoints

import uuid
from datetime import datetime

class SubscriptionRepository:
"""Your subscription management layer"""

def __init__(self):
self.subscriptions = {}

def create_subscription(self) -> str:
"""Create a new subscription"""
sub_id = str(uuid.uuid4())
self.subscriptions[sub_id] = {
'id': sub_id,
'created': datetime.utcnow().isoformat() + 'Z',
'monitored_items': [],
'queue': []
}
return sub_id

def get_subscription(self, sub_id: str) -> Optional[Dict]:
return self.subscriptions.get(sub_id)

def delete_subscription(self, sub_id: str) -> bool:
if sub_id in self.subscriptions:
del self.subscriptions[sub_id]
return True
return False

def register_items(self, sub_id: str, element_ids: List[str], max_depth: int = 1):
if sub_id in self.subscriptions:
self.subscriptions[sub_id]['monitored_items'].extend(element_ids)

def unregister_items(self, sub_id: str, element_ids: List[str]):
if sub_id in self.subscriptions:
self.subscriptions[sub_id]['monitored_items'] = [
e for e in self.subscriptions[sub_id]['monitored_items']
if e not in element_ids
]

def get_and_clear_queue(self, sub_id: str) -> List[Dict]:
if sub_id in self.subscriptions:
queue = self.subscriptions[sub_id]['queue']
self.subscriptions[sub_id]['queue'] = []
return queue
return []

sub_repo = SubscriptionRepository()

@app.route('/subscriptions', methods=['GET'])
@require_auth
def list_subscriptions():
"""List all subscriptions"""
summaries = [
{'subscriptionId': s['id'], 'created': s['created']}
for s in sub_repo.subscriptions.values()
]
return jsonify({'subscriptionIds': summaries}), 200

@app.route('/subscriptions', methods=['POST'])
@require_auth
def create_subscription():
"""Create a new subscription"""
sub_id = sub_repo.create_subscription()
return jsonify({
'subscriptionId': sub_id,
'message': 'Subscription created successfully'
}), 201

@app.route('/subscriptions/<sub_id>', methods=['GET'])
@require_auth
def get_subscription(sub_id: str):
"""Get subscription details"""
sub = sub_repo.get_subscription(sub_id)
if not sub:
return jsonify({'detail': [{'msg': 'Subscription not found', 'type': 'not_found'}]}), 404
return jsonify(sub), 200

@app.route('/subscriptions/<sub_id>', methods=['DELETE'])
@require_auth
def delete_subscription(sub_id: str):
"""Delete a subscription"""
if sub_repo.delete_subscription(sub_id):
return '', 204
return jsonify({'detail': [{'msg': 'Subscription not found', 'type': 'not_found'}]}), 404

@app.route('/subscriptions/<sub_id>/register', methods=['POST'])
@require_auth
def register_monitored_items(sub_id: str):
"""Register objects to monitor"""
data = request.get_json()
element_ids = data.get('elementIds', [])
max_depth = data.get('maxDepth', 1)

if not element_ids:
return jsonify({'detail': [{'loc': ['body', 'elementIds'], 'msg': 'elementIds required', 'type': 'value_error'}]}), 422

sub_repo.register_items(sub_id, element_ids, max_depth)
return jsonify({'status': 'registered', 'count': len(element_ids)}), 200

@app.route('/subscriptions/<sub_id>/unregister', methods=['POST'])
@require_auth
def unregister_monitored_items(sub_id: str):
"""Unregister objects from monitoring"""
data = request.get_json()
element_ids = data.get('elementIds', [])

sub_repo.unregister_items(sub_id, element_ids)
return jsonify({'status': 'unregistered', 'count': len(element_ids)}), 200

@app.route('/subscriptions/<sub_id>/stream', methods=['GET'])
@require_auth
def subscription_stream(sub_id: str):
"""SSE stream for real-time updates"""
def generate():
while True:
# Your implementation to yield SSE events
# yield f"data: {json.dumps(update)}\\n\\n"
pass
return app.response_class(generate(), mimetype='text/event-stream')

@app.route('/subscriptions/<sub_id>/sync', methods=['POST'])
@require_auth
def subscription_sync(sub_id: str):
"""Get and clear queued updates"""
updates = sub_repo.get_and_clear_queue(sub_id)
return jsonify(updates), 200

Best Practices

1. Consistent Error Handling

Always return errors in a consistent format:

def create_error_response(error_type: str, message: str, status_code: int, details: Dict = None):
"""Create a standardized error response"""
response = {
'error': error_type,
'message': message,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'path': request.path
}
if details:
response['details'] = details

return jsonify(response), status_code

2. Request Logging

Log all requests for debugging and audit purposes:

import time

@app.before_request
def before_request_logging():
request.start_time = time.time()
app.logger.info(f"Request: {request.method} {request.path}")

@app.after_request
def after_request_logging(response):
duration = time.time() - request.start_time
app.logger.info(
f"Response: {response.status_code} "
f"Duration: {duration*1000:.2f}ms"
)
return response

3. Connection Pooling

Use connection pooling for database access:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
'postgresql://user:pass@localhost/db',
poolclass=QueuePool,
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600
)

4. Async Operations

For long-running operations, use async patterns:

from celery import Celery

celery = Celery('tasks', broker='redis://localhost:6379')

@celery.task
def process_large_dataset(entity_id, params):
"""Process large dataset asynchronously"""
# Your processing logic here
pass

@app.route('/objects/<element_id>/process', methods=['POST'])
@require_auth
def start_processing(element_id: str):
"""Start asynchronous processing"""
params = request.get_json()

task = process_large_dataset.delay(element_id, params)

return jsonify({
'taskId': task.id,
'status': 'processing',
'statusUrl': f'/tasks/{task.id}'
}), 202