初始提交,未完全测试
This commit is contained in:
86
kit/Func.py
Normal file
86
kit/Func.py
Normal file
@@ -0,0 +1,86 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal Func - Lambda Function Utilities
|
||||
"""
|
||||
|
||||
from typing import Callable, TypeVar, Generic
|
||||
|
||||
T = TypeVar('T')
|
||||
U = TypeVar('U')
|
||||
V = TypeVar('V')
|
||||
W = TypeVar('W')
|
||||
X = TypeVar('X')
|
||||
Y = TypeVar('Y')
|
||||
Z = TypeVar('Z')
|
||||
R = TypeVar('R')
|
||||
|
||||
class F00:
|
||||
"""0 parameter 0 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[], None]):
|
||||
"""Execute function with no parameters and no return"""
|
||||
func()
|
||||
|
||||
class F10(Generic[T]):
|
||||
"""1 parameter 0 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[T], None], t: T):
|
||||
"""Execute function with one parameter and no return"""
|
||||
func(t)
|
||||
|
||||
class F20(Generic[T, U]):
|
||||
"""2 parameter 0 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[T, U], None], t: T, u: U):
|
||||
"""Execute function with two parameters and no return"""
|
||||
func(t, u)
|
||||
|
||||
class F30(Generic[T, U, V]):
|
||||
"""3 parameter 0 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[T, U, V], None], t: T, u: U, v: V):
|
||||
"""Execute function with three parameters and no return"""
|
||||
func(t, u, v)
|
||||
|
||||
class F40(Generic[T, U, V, W]):
|
||||
"""4 parameter 0 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[T, U, V, W], None], t: T, u: U, v: V, w: W):
|
||||
"""Execute function with four parameters and no return"""
|
||||
func(t, u, v, w)
|
||||
|
||||
class F01(Generic[R]):
|
||||
"""0 parameter 1 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[], R]) -> R:
|
||||
"""Execute function with no parameters and return value"""
|
||||
return func()
|
||||
|
||||
class F11(Generic[T, R]):
|
||||
"""1 parameter 1 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[T], R], t: T) -> R:
|
||||
"""Execute function with one parameter and return value"""
|
||||
return func(t)
|
||||
|
||||
class F21(Generic[T, U, R]):
|
||||
"""2 parameter 1 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[T, U], R], t: T, u: U) -> R:
|
||||
"""Execute function with two parameters and return value"""
|
||||
return func(t, u)
|
||||
|
||||
class F31(Generic[T, U, V, R]):
|
||||
"""3 parameter 1 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[T, U, V], R], t: T, u: U, v: V) -> R:
|
||||
"""Execute function with three parameters and return value"""
|
||||
return func(t, u, v)
|
||||
|
||||
class F41(Generic[T, U, V, W, R]):
|
||||
"""4 parameter 1 return function"""
|
||||
@staticmethod
|
||||
def call(func: Callable[[T, U, V, W], R], t: T, u: U, v: V, w: W) -> R:
|
||||
"""Execute function with four parameters and return value"""
|
||||
return func(t, u, v, w)
|
||||
107
kit/HashKit.py
Normal file
107
kit/HashKit.py
Normal file
@@ -0,0 +1,107 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal HashKit - Hash and Encryption Utilities
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import secrets
|
||||
import threading
|
||||
|
||||
class HashKit:
|
||||
"""Hash utility class"""
|
||||
|
||||
FNV_OFFSET_BASIS_64 = 0xcbf29ce484222325
|
||||
FNV_PRIME_64 = 0x100000001b3
|
||||
|
||||
_HEX_DIGITS = "0123456789abcdef"
|
||||
_CHAR_ARRAY = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
|
||||
@staticmethod
|
||||
def fnv1a64(key: str) -> int:
|
||||
"""FNV-1a 64-bit hash"""
|
||||
hash_val = HashKit.FNV_OFFSET_BASIS_64
|
||||
for i in range(len(key)):
|
||||
hash_val ^= ord(key[i])
|
||||
hash_val *= HashKit.FNV_PRIME_64
|
||||
return hash_val
|
||||
|
||||
@staticmethod
|
||||
def md5(src_str: str) -> str:
|
||||
"""MD5 hash"""
|
||||
return HashKit._hash("md5", src_str)
|
||||
|
||||
@staticmethod
|
||||
def sha1(src_str: str) -> str:
|
||||
"""SHA-1 hash"""
|
||||
return HashKit._hash("sha1", src_str)
|
||||
|
||||
@staticmethod
|
||||
def sha256(src_str: str) -> str:
|
||||
"""SHA-256 hash"""
|
||||
return HashKit._hash("sha256", src_str)
|
||||
|
||||
@staticmethod
|
||||
def sha384(src_str: str) -> str:
|
||||
"""SHA-384 hash"""
|
||||
return HashKit._hash("sha384", src_str)
|
||||
|
||||
@staticmethod
|
||||
def sha512(src_str: str) -> str:
|
||||
"""SHA-512 hash"""
|
||||
return HashKit._hash("sha512", src_str)
|
||||
|
||||
@staticmethod
|
||||
def _hash(algorithm: str, src_str: str) -> str:
|
||||
"""Generic hash function"""
|
||||
try:
|
||||
md = hashlib.new(algorithm)
|
||||
md.update(src_str.encode('utf-8'))
|
||||
return HashKit.to_hex(md.digest())
|
||||
except Exception as e:
|
||||
raise RuntimeError(e)
|
||||
|
||||
@staticmethod
|
||||
def to_hex(bytes_data: bytes) -> str:
|
||||
"""Convert bytes to hex string"""
|
||||
ret = []
|
||||
for b in bytes_data:
|
||||
ret.append(HashKit._HEX_DIGITS[(b >> 4) & 0x0f])
|
||||
ret.append(HashKit._HEX_DIGITS[b & 0x0f])
|
||||
return ''.join(ret)
|
||||
|
||||
@staticmethod
|
||||
def generate_salt(salt_length: int) -> str:
|
||||
"""Generate random salt"""
|
||||
salt = []
|
||||
for i in range(salt_length):
|
||||
salt.append(secrets.choice(HashKit._CHAR_ARRAY))
|
||||
return ''.join(salt)
|
||||
|
||||
@staticmethod
|
||||
def generate_salt_for_sha256() -> str:
|
||||
"""Generate salt for SHA-256"""
|
||||
return HashKit.generate_salt(32)
|
||||
|
||||
@staticmethod
|
||||
def generate_salt_for_sha512() -> str:
|
||||
"""Generate salt for SHA-512"""
|
||||
return HashKit.generate_salt(64)
|
||||
|
||||
@staticmethod
|
||||
def generate_random_str(str_length: int) -> str:
|
||||
"""Generate random string"""
|
||||
return HashKit.generate_salt(str_length)
|
||||
|
||||
@staticmethod
|
||||
def slow_equals(a: bytes, b: bytes) -> bool:
|
||||
"""Timing-safe byte comparison"""
|
||||
if a is None or b is None:
|
||||
return False
|
||||
if len(a) != len(b):
|
||||
return False
|
||||
|
||||
diff = 0
|
||||
for i in range(len(a)):
|
||||
diff |= a[i] ^ b[i]
|
||||
return diff == 0
|
||||
75
kit/JavaKeyword.py
Normal file
75
kit/JavaKeyword.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal JavaKeyword - Java Keyword Detection
|
||||
"""
|
||||
|
||||
from typing import Set, FrozenSet
|
||||
|
||||
class JavaKeyword:
|
||||
"""Java keyword detection utility"""
|
||||
|
||||
_KEYWORD_SET: FrozenSet[str] = frozenset({
|
||||
"abstract", "assert", "boolean", "break", "byte", "case", "catch",
|
||||
"char", "class", "const", "continue", "default", "do", "double",
|
||||
"else", "enum", "extends", "final", "finally", "float", "for",
|
||||
"goto", "if", "implements", "import", "instanceof", "int",
|
||||
"interface", "long", "native", "new", "package", "private",
|
||||
"protected", "public", "return", "strictfp", "short", "static",
|
||||
"super", "switch", "synchronized", "this", "throw", "throws",
|
||||
"transient", "try", "void", "volatile", "while"
|
||||
})
|
||||
|
||||
_instance = None
|
||||
|
||||
@staticmethod
|
||||
def get_instance():
|
||||
"""Get shared instance"""
|
||||
if JavaKeyword._instance is None:
|
||||
JavaKeyword._instance = JavaKeyword()
|
||||
return JavaKeyword._instance
|
||||
|
||||
def __init__(self):
|
||||
self._keywords = set(JavaKeyword._KEYWORD_SET)
|
||||
# Make it read-only by default
|
||||
self._read_only = True
|
||||
|
||||
def add_keyword(self, keyword: str) -> 'JavaKeyword':
|
||||
"""Add custom keyword"""
|
||||
if self._read_only:
|
||||
raise RuntimeError("Cannot modify read-only JavaKeyword instance")
|
||||
|
||||
if keyword and keyword.strip():
|
||||
self._keywords.add(keyword)
|
||||
|
||||
return self
|
||||
|
||||
def remove_keyword(self, keyword: str) -> 'JavaKeyword':
|
||||
"""Remove keyword"""
|
||||
if self._read_only:
|
||||
raise RuntimeError("Cannot modify read-only JavaKeyword instance")
|
||||
|
||||
self._keywords.discard(keyword)
|
||||
return self
|
||||
|
||||
def contains(self, word: str) -> bool:
|
||||
"""Check if word is a Java keyword"""
|
||||
return word in self._keywords
|
||||
|
||||
def is_keyword(self, word: str) -> bool:
|
||||
"""Check if word is a Java keyword (alias of contains)"""
|
||||
return self.contains(word)
|
||||
|
||||
@property
|
||||
def keywords(self) -> FrozenSet[str]:
|
||||
"""Get all keywords"""
|
||||
return frozenset(self._keywords)
|
||||
|
||||
@property
|
||||
def keyword_count(self) -> int:
|
||||
"""Get keyword count"""
|
||||
return len(self._keywords)
|
||||
|
||||
|
||||
# Create shared instance
|
||||
me = JavaKeyword.get_instance()
|
||||
141
kit/Kv.py
Normal file
141
kit/Kv.py
Normal file
@@ -0,0 +1,141 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal Kv (Key Value) - Ordered Dictionary Implementation
|
||||
"""
|
||||
|
||||
from .StrKit import StrKit
|
||||
from .TypeKit import TypeKit
|
||||
|
||||
class Kv(dict):
|
||||
"""Key Value dictionary with utility methods"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def of(key, value):
|
||||
"""Create Kv with initial key-value pair"""
|
||||
return Kv().set(key, value)
|
||||
|
||||
@staticmethod
|
||||
def by(key, value):
|
||||
"""Create Kv with initial key-value pair (alias of of)"""
|
||||
return Kv.of(key, value)
|
||||
|
||||
@staticmethod
|
||||
def create():
|
||||
"""Create empty Kv"""
|
||||
return Kv()
|
||||
|
||||
def set(self, key, value):
|
||||
"""Set key-value pair and return self for chaining"""
|
||||
super().__setitem__(key, value)
|
||||
return self
|
||||
|
||||
def set_if_not_blank(self, key, value):
|
||||
"""Set value only if not blank"""
|
||||
if StrKit.not_blank(value):
|
||||
self.set(key, value)
|
||||
return self
|
||||
|
||||
def set_if_not_null(self, key, value):
|
||||
"""Set value only if not None"""
|
||||
if value is not None:
|
||||
self.set(key, value)
|
||||
return self
|
||||
|
||||
def delete(self, key):
|
||||
"""Delete key and return self"""
|
||||
super().pop(key, None)
|
||||
return self
|
||||
|
||||
def get_as(self, key, default_value=None):
|
||||
"""Get value as specific type"""
|
||||
return self.get(key, default_value)
|
||||
|
||||
def get_str(self, key, default_value=None):
|
||||
"""Get value as string"""
|
||||
value = self.get(key)
|
||||
return str(value) if value is not None else default_value
|
||||
|
||||
def get_int(self, key, default_value=None):
|
||||
"""Get value as integer"""
|
||||
return TypeKit.to_int(self.get(key, default_value))
|
||||
|
||||
def get_long(self, key, default_value=None):
|
||||
"""Get value as long integer"""
|
||||
return TypeKit.to_long(self.get(key, default_value))
|
||||
|
||||
def get_big_decimal(self, key, default_value=None):
|
||||
"""Get value as BigDecimal"""
|
||||
return TypeKit.to_big_decimal(self.get(key, default_value))
|
||||
|
||||
def get_double(self, key, default_value=None):
|
||||
"""Get value as double"""
|
||||
return TypeKit.to_double(self.get(key, default_value))
|
||||
|
||||
def get_float(self, key, default_value=None):
|
||||
"""Get value as float"""
|
||||
return TypeKit.to_float(self.get(key, default_value))
|
||||
|
||||
def get_number(self, key, default_value=None):
|
||||
"""Get value as Number"""
|
||||
return TypeKit.to_number(self.get(key, default_value))
|
||||
|
||||
def get_boolean(self, key, default_value=None):
|
||||
"""Get value as boolean"""
|
||||
return TypeKit.to_boolean(self.get(key, default_value))
|
||||
|
||||
def get_date(self, key, default_value=None):
|
||||
"""Get value as Date"""
|
||||
return TypeKit.to_date(self.get(key, default_value))
|
||||
|
||||
def get_local_datetime(self, key, default_value=None):
|
||||
"""Get value as LocalDateTime"""
|
||||
return TypeKit.to_local_date_time(self.get(key, default_value))
|
||||
|
||||
def not_null(self, key):
|
||||
"""Check if key exists and value is not None"""
|
||||
return self.get(key) is not None
|
||||
|
||||
def is_null(self, key):
|
||||
"""Check if key doesn't exist or value is None"""
|
||||
return self.get(key) is None
|
||||
|
||||
def not_blank(self, key):
|
||||
"""Check if value is not blank string"""
|
||||
return StrKit.not_blank(self.get_str(key))
|
||||
|
||||
def is_blank(self, key):
|
||||
"""Check if value is blank string"""
|
||||
return StrKit.is_blank(self.get_str(key))
|
||||
|
||||
def is_true(self, key):
|
||||
"""Check if value is True"""
|
||||
value = self.get(key)
|
||||
return value is not None and TypeKit.to_boolean(value)
|
||||
|
||||
def is_false(self, key):
|
||||
"""Check if value is False"""
|
||||
value = self.get(key)
|
||||
return value is not None and not TypeKit.to_boolean(value)
|
||||
|
||||
def keep(self, *keys):
|
||||
"""Keep only specified keys"""
|
||||
if keys and len(keys) > 0:
|
||||
new_kv = Kv()
|
||||
for key in keys:
|
||||
if self.contains_key(key):
|
||||
new_kv.set(key, self.get(key))
|
||||
|
||||
self.clear()
|
||||
self.update(new_kv)
|
||||
else:
|
||||
self.clear()
|
||||
|
||||
return self
|
||||
|
||||
def to_map(self):
|
||||
"""Convert to regular map"""
|
||||
return self
|
||||
143
kit/Okv.py
Normal file
143
kit/Okv.py
Normal file
@@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal Okv (Ordered Key Value) - Ordered Dictionary Implementation
|
||||
"""
|
||||
|
||||
from .Kv import Kv
|
||||
from .StrKit import StrKit
|
||||
from .TypeKit import TypeKit
|
||||
|
||||
class Okv(dict):
|
||||
"""Ordered Key Value dictionary with utility methods"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
# In Python 3.7+, dict maintains insertion order
|
||||
|
||||
@staticmethod
|
||||
def of(key, value):
|
||||
"""Create Okv with initial key-value pair"""
|
||||
return Okv().set(key, value)
|
||||
|
||||
@staticmethod
|
||||
def by(key, value):
|
||||
"""Create Okv with initial key-value pair (alias of of)"""
|
||||
return Okv.of(key, value)
|
||||
|
||||
@staticmethod
|
||||
def create():
|
||||
"""Create empty Okv"""
|
||||
return Okv()
|
||||
|
||||
def set(self, key, value):
|
||||
"""Set key-value pair and return self for chaining"""
|
||||
super().__setitem__(key, value)
|
||||
return self
|
||||
|
||||
def set_if_not_blank(self, key, value):
|
||||
"""Set value only if not blank"""
|
||||
if StrKit.not_blank(value):
|
||||
self.set(key, value)
|
||||
return self
|
||||
|
||||
def set_if_not_null(self, key, value):
|
||||
"""Set value only if not None"""
|
||||
if value is not None:
|
||||
self.set(key, value)
|
||||
return self
|
||||
|
||||
def delete(self, key):
|
||||
"""Delete key and return self"""
|
||||
super().pop(key, None)
|
||||
return self
|
||||
|
||||
def get_as(self, key, default_value=None):
|
||||
"""Get value as specific type"""
|
||||
return self.get(key, default_value)
|
||||
|
||||
def get_str(self, key, default_value=None):
|
||||
"""Get value as string"""
|
||||
value = self.get(key)
|
||||
return str(value) if value is not None else default_value
|
||||
|
||||
def get_int(self, key, default_value=None):
|
||||
"""Get value as integer"""
|
||||
return TypeKit.to_int(self.get(key, default_value))
|
||||
|
||||
def get_long(self, key, default_value=None):
|
||||
"""Get value as long integer"""
|
||||
return TypeKit.to_long(self.get(key, default_value))
|
||||
|
||||
def get_big_decimal(self, key, default_value=None):
|
||||
"""Get value as BigDecimal"""
|
||||
return TypeKit.to_big_decimal(self.get(key, default_value))
|
||||
|
||||
def get_double(self, key, default_value=None):
|
||||
"""Get value as double"""
|
||||
return TypeKit.to_double(self.get(key, default_value))
|
||||
|
||||
def get_float(self, key, default_value=None):
|
||||
"""Get value as float"""
|
||||
return TypeKit.to_float(self.get(key, default_value))
|
||||
|
||||
def get_number(self, key, default_value=None):
|
||||
"""Get value as Number"""
|
||||
return TypeKit.to_number(self.get(key, default_value))
|
||||
|
||||
def get_boolean(self, key, default_value=None):
|
||||
"""Get value as boolean"""
|
||||
return TypeKit.to_boolean(self.get(key, default_value))
|
||||
|
||||
def get_date(self, key, default_value=None):
|
||||
"""Get value as Date"""
|
||||
return TypeKit.to_date(self.get(key, default_value))
|
||||
|
||||
def get_local_datetime(self, key, default_value=None):
|
||||
"""Get value as LocalDateTime"""
|
||||
return TypeKit.to_local_date_time(self.get(key, default_value))
|
||||
|
||||
def not_null(self, key):
|
||||
"""Check if key exists and value is not None"""
|
||||
return self.get(key) is not None
|
||||
|
||||
def is_null(self, key):
|
||||
"""Check if key doesn't exist or value is None"""
|
||||
return self.get(key) is None
|
||||
|
||||
def not_blank(self, key):
|
||||
"""Check if value is not blank string"""
|
||||
return StrKit.not_blank(self.get_str(key))
|
||||
|
||||
def is_blank(self, key):
|
||||
"""Check if value is blank string"""
|
||||
return StrKit.is_blank(self.get_str(key))
|
||||
|
||||
def is_true(self, key):
|
||||
"""Check if value is True"""
|
||||
value = self.get(key)
|
||||
return value is not None and TypeKit.to_boolean(value)
|
||||
|
||||
def is_false(self, key):
|
||||
"""Check if value is False"""
|
||||
value = self.get(key)
|
||||
return value is not None and not TypeKit.to_boolean(value)
|
||||
|
||||
def keep(self, *keys):
|
||||
"""Keep only specified keys"""
|
||||
if keys and len(keys) > 0:
|
||||
new_okv = Okv()
|
||||
for key in keys:
|
||||
if self.contains_key(key):
|
||||
new_okv.set(key, self.get(key))
|
||||
|
||||
self.clear()
|
||||
self.update(new_okv)
|
||||
else:
|
||||
self.clear()
|
||||
|
||||
return self
|
||||
|
||||
def to_map(self):
|
||||
"""Convert to regular map"""
|
||||
return self
|
||||
166
kit/Prop.py
Normal file
166
kit/Prop.py
Normal file
@@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal Prop - Properties File Handler
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
class Prop:
|
||||
"""Properties file handler"""
|
||||
|
||||
DEFAULT_ENCODING = "UTF-8"
|
||||
|
||||
def __init__(self, file_name_or_content=None, encoding: str = DEFAULT_ENCODING, is_file: bool = True):
|
||||
"""
|
||||
Initialize Prop
|
||||
|
||||
Args:
|
||||
file_name_or_content: file name (if is_file=True) or content string
|
||||
encoding: encoding for reading properties file
|
||||
is_file: True if first parameter is file name, False if it's content string
|
||||
"""
|
||||
self._properties = {}
|
||||
|
||||
if file_name_or_content is None:
|
||||
return
|
||||
|
||||
if is_file:
|
||||
self._load_from_file(file_name_or_content, encoding)
|
||||
else:
|
||||
self._load_from_content(file_name_or_content, encoding)
|
||||
|
||||
def _load_from_file(self, file_name: str, encoding: str):
|
||||
"""Load properties from file"""
|
||||
try:
|
||||
# Try to find file in classpath first
|
||||
import importlib.resources
|
||||
try:
|
||||
with importlib.resources.open_text(file_name, encoding=encoding) as f:
|
||||
self._parse_properties(f.read())
|
||||
return
|
||||
except (FileNotFoundError, TypeError):
|
||||
pass
|
||||
|
||||
# Try as absolute file path
|
||||
if os.path.isabs(file_name):
|
||||
file_path = file_name
|
||||
else:
|
||||
file_path = os.path.join(os.getcwd(), file_name)
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"Properties file not found: {file_name}")
|
||||
|
||||
with open(file_path, 'r', encoding=encoding) as f:
|
||||
self._parse_properties(f.read())
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error loading properties file: {e}")
|
||||
|
||||
def _load_from_content(self, content: str, encoding: str):
|
||||
"""Load properties from content string"""
|
||||
try:
|
||||
self._parse_properties(content)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error parsing properties content: {e}")
|
||||
|
||||
def _parse_properties(self, content: str):
|
||||
"""Parse properties content"""
|
||||
for line in content.split('\n'):
|
||||
line = line.strip()
|
||||
|
||||
# Skip empty lines and comments
|
||||
if not line or line.startswith('#') or line.startswith('!'):
|
||||
continue
|
||||
|
||||
# Handle line continuation
|
||||
while line.endswith('\\'):
|
||||
line = line[:-1].strip()
|
||||
# This is a simplified implementation
|
||||
|
||||
# Find the first separator
|
||||
separator_idx = -1
|
||||
for sep in ['=', ':']:
|
||||
idx = line.find(sep)
|
||||
if idx != -1:
|
||||
if separator_idx == -1 or idx < separator_idx:
|
||||
separator_idx = idx
|
||||
|
||||
if separator_idx == -1:
|
||||
# No separator, treat as key with empty value
|
||||
key = line.strip()
|
||||
value = ""
|
||||
else:
|
||||
key = line[:separator_idx].strip()
|
||||
value = line[separator_idx + 1:].strip()
|
||||
|
||||
if key:
|
||||
self._properties[key] = value
|
||||
|
||||
def get(self, key: str, default_value: str = None) -> Optional[str]:
|
||||
"""Get property value"""
|
||||
value = self._properties.get(key)
|
||||
if value is not None and len(value) != 0:
|
||||
return value.strip()
|
||||
return default_value
|
||||
|
||||
def get_int(self, key: str, default_value: int = None) -> Optional[int]:
|
||||
"""Get property as integer"""
|
||||
value = self.get(key)
|
||||
if value is not None:
|
||||
return int(value)
|
||||
return default_value
|
||||
|
||||
def get_long(self, key: str, default_value: int = None) -> Optional[int]:
|
||||
"""Get property as long integer"""
|
||||
value = self.get(key)
|
||||
if value is not None:
|
||||
return int(value)
|
||||
return default_value
|
||||
|
||||
def get_double(self, key: str, default_value: float = None) -> Optional[float]:
|
||||
"""Get property as double"""
|
||||
value = self.get(key)
|
||||
if value is not None:
|
||||
return float(value)
|
||||
return default_value
|
||||
|
||||
def get_boolean(self, key: str, default_value: bool = None) -> Optional[bool]:
|
||||
"""Get property as boolean"""
|
||||
value = self.get(key)
|
||||
if value is not None:
|
||||
value_lower = value.lower().strip()
|
||||
if value_lower == "true":
|
||||
return True
|
||||
elif value_lower == "false":
|
||||
return False
|
||||
else:
|
||||
raise ValueError(f"Cannot parse boolean value: {value}")
|
||||
return default_value
|
||||
|
||||
def contains_key(self, key: str) -> bool:
|
||||
"""Check if key exists"""
|
||||
return key in self._properties
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
"""Check if properties is empty"""
|
||||
return len(self._properties) == 0
|
||||
|
||||
def not_empty(self) -> bool:
|
||||
"""Check if properties is not empty"""
|
||||
return not self.is_empty()
|
||||
|
||||
def append(self, prop) -> 'Prop':
|
||||
"""Append properties from another Prop"""
|
||||
if prop is None:
|
||||
raise ValueError("prop cannot be None")
|
||||
|
||||
for key, value in prop.get_properties().items():
|
||||
self._properties[key] = value
|
||||
|
||||
return self
|
||||
|
||||
def get_properties(self) -> dict:
|
||||
"""Get all properties"""
|
||||
return self._properties.copy()
|
||||
135
kit/PropKit.py
Normal file
135
kit/PropKit.py
Normal file
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal PropKit - Properties File Management
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import Optional, Dict
|
||||
from .Prop import Prop
|
||||
from .StrKit import StrKit
|
||||
|
||||
class PropKit:
|
||||
"""Properties file management utility"""
|
||||
|
||||
_env_key = "app.env"
|
||||
_prop: Optional[Prop] = None
|
||||
_cache: Dict[str, Prop] = {}
|
||||
|
||||
def __init__(self):
|
||||
raise NotImplementedError("PropKit is a utility class and cannot be instantiated")
|
||||
|
||||
@staticmethod
|
||||
def set_env_key(env_key: str):
|
||||
"""Set environment key for config"""
|
||||
PropKit._env_key = env_key
|
||||
|
||||
@staticmethod
|
||||
def get_env_key() -> str:
|
||||
"""Get environment key"""
|
||||
return PropKit._env_key
|
||||
|
||||
@staticmethod
|
||||
def get_env() -> str:
|
||||
"""Get current environment value"""
|
||||
return PropKit.get_prop().get(PropKit._env_key)
|
||||
|
||||
@staticmethod
|
||||
def use(file_name: str, encoding: str = Prop.DEFAULT_ENCODING) -> Prop:
|
||||
"""Use properties file"""
|
||||
if file_name not in PropKit._cache:
|
||||
PropKit._cache[file_name] = Prop(file_name, encoding)
|
||||
PropKit._handle_env(PropKit._cache[file_name], file_name)
|
||||
|
||||
if PropKit._prop is None:
|
||||
PropKit._prop = PropKit._cache[file_name]
|
||||
|
||||
return PropKit._cache[file_name]
|
||||
|
||||
@staticmethod
|
||||
def _handle_env(result: Prop, file_name: str):
|
||||
"""Handle environment-specific configuration"""
|
||||
env = result.get(PropKit._env_key)
|
||||
if StrKit.not_blank(env):
|
||||
dot_index = file_name.rfind('.')
|
||||
if dot_index != -1:
|
||||
env_config_name = file_name[:dot_index] + "-" + env + file_name[dot_index:]
|
||||
else:
|
||||
env_config_name = file_name + "-" + env
|
||||
|
||||
try:
|
||||
env_config = Prop(env_config_name)
|
||||
result.append(env_config)
|
||||
except:
|
||||
pass # Ignore if env config doesn't exist
|
||||
|
||||
@staticmethod
|
||||
def useless(file_name: str) -> Optional[Prop]:
|
||||
"""Remove properties file from cache"""
|
||||
removed = PropKit._cache.pop(file_name, None)
|
||||
if PropKit._prop is removed:
|
||||
PropKit._prop = None
|
||||
return removed
|
||||
|
||||
@staticmethod
|
||||
def clear():
|
||||
"""Clear all cached properties"""
|
||||
PropKit._prop = None
|
||||
PropKit._cache.clear()
|
||||
|
||||
@staticmethod
|
||||
def append(prop: Prop) -> Prop:
|
||||
"""Append properties"""
|
||||
with PropKit:
|
||||
if PropKit._prop is not None:
|
||||
PropKit._prop.append(prop)
|
||||
else:
|
||||
PropKit._prop = prop
|
||||
return PropKit._prop
|
||||
|
||||
@staticmethod
|
||||
def append_content(content: str, encoding: str = Prop.DEFAULT_ENCODING) -> Prop:
|
||||
"""Append properties from content string"""
|
||||
return PropKit.append(Prop(content, encoding, is_file=False))
|
||||
|
||||
@staticmethod
|
||||
def get_prop() -> Prop:
|
||||
"""Get current properties"""
|
||||
if PropKit._prop is None:
|
||||
raise IllegalStateException("Load properties file by invoking PropKit.use(String fileName) method first.")
|
||||
return PropKit._prop
|
||||
|
||||
@staticmethod
|
||||
def get(key: str, default_value: str = None) -> Optional[str]:
|
||||
"""Get property value"""
|
||||
return PropKit.get_prop().get(key, default_value)
|
||||
|
||||
@staticmethod
|
||||
def get_int(key: str, default_value: int = None) -> Optional[int]:
|
||||
"""Get property as integer"""
|
||||
return PropKit.get_prop().get_int(key, default_value)
|
||||
|
||||
@staticmethod
|
||||
def get_long(key: str, default_value: int = None) -> Optional[int]:
|
||||
"""Get property as long"""
|
||||
return PropKit.get_prop().get_long(key, default_value)
|
||||
|
||||
@staticmethod
|
||||
def get_double(key: str, default_value: float = None) -> Optional[float]:
|
||||
"""Get property as double"""
|
||||
return PropKit.get_prop().get_double(key, default_value)
|
||||
|
||||
@staticmethod
|
||||
def get_boolean(key: str, default_value: bool = None) -> Optional[bool]:
|
||||
"""Get property as boolean"""
|
||||
return PropKit.get_prop().get_boolean(key, default_value)
|
||||
|
||||
@staticmethod
|
||||
def contains_key(key: str) -> bool:
|
||||
"""Check if key exists"""
|
||||
return PropKit.get_prop().contains_key(key)
|
||||
|
||||
|
||||
class IllegalStateException(Exception):
|
||||
"""Illegal state exception"""
|
||||
pass
|
||||
107
kit/ReflectKit.py
Normal file
107
kit/ReflectKit.py
Normal file
@@ -0,0 +1,107 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal ReflectKit - Reflection Utilities
|
||||
"""
|
||||
|
||||
import inspect
|
||||
from typing import Any, Dict, List, Type
|
||||
|
||||
class ReflectKit:
|
||||
"""Reflection utility class"""
|
||||
|
||||
@staticmethod
|
||||
def new_instance(clazz: Type) -> Any:
|
||||
"""Create new instance of class"""
|
||||
try:
|
||||
# Try __new__ first (for immutable types)
|
||||
if hasattr(clazz, '__new__'):
|
||||
obj = clazz.__new__(clazz)
|
||||
if obj is not None and not isinstance(obj, type):
|
||||
return obj
|
||||
|
||||
# Fall back to regular instantiation
|
||||
return clazz()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error creating instance of {clazz}: {e}")
|
||||
|
||||
@staticmethod
|
||||
def get_method_signature(method) -> str:
|
||||
"""Get method signature as string"""
|
||||
if not callable(method):
|
||||
raise ValueError("method must be callable")
|
||||
|
||||
try:
|
||||
# Get function information
|
||||
if hasattr(method, '__name__'):
|
||||
method_name = method.__name__
|
||||
else:
|
||||
method_name = str(method)
|
||||
|
||||
# Get declaring class
|
||||
if hasattr(method, '__qualname__'):
|
||||
declaring_class = method.__qualname__.rsplit('.', 1)[0] if '.' in method.__qualname__ else ''
|
||||
else:
|
||||
declaring_class = ''
|
||||
|
||||
# Get parameters
|
||||
try:
|
||||
sig = inspect.signature(method)
|
||||
params = []
|
||||
for param_name, param in sig.parameters.items():
|
||||
if param_name != 'self':
|
||||
param_type = param.annotation if param.annotation != inspect.Parameter.empty else Any
|
||||
params.append(str(param_type.__name__) if hasattr(param_type, '__name__') else str(param_type))
|
||||
except (ValueError, TypeError):
|
||||
params = []
|
||||
|
||||
# Build signature
|
||||
if declaring_class:
|
||||
signature = f"{declaring_class}.{method_name}({', '.join(params)})"
|
||||
else:
|
||||
signature = f"{method_name}({', '.join(params)})"
|
||||
|
||||
return signature
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error getting method signature: {e}")
|
||||
|
||||
@staticmethod
|
||||
def get_class_methods(clazz: Type) -> Dict[str, callable]:
|
||||
"""Get all methods of a class"""
|
||||
methods = {}
|
||||
try:
|
||||
for name, method in inspect.getmembers(clazz, predicate=inspect.isfunction):
|
||||
if not name.startswith('_'):
|
||||
methods[name] = method
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error getting class methods: {e}")
|
||||
return methods
|
||||
|
||||
@staticmethod
|
||||
def get_class_attributes(clazz: Type) -> Dict[str, Any]:
|
||||
"""Get all class attributes"""
|
||||
attributes = {}
|
||||
try:
|
||||
for name, value in inspect.getmembers(clazz):
|
||||
if not name.startswith('_') and not callable(value):
|
||||
attributes[name] = value
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error getting class attributes: {e}")
|
||||
return attributes
|
||||
|
||||
@staticmethod
|
||||
def is_subclass_of(subclass: Type, superclass: Type) -> bool:
|
||||
"""Check if subclass is subclass of superclass"""
|
||||
try:
|
||||
return issubclass(subclass, superclass)
|
||||
except TypeError:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def is_instance_of(obj: Any, clazz: Type) -> bool:
|
||||
"""Check if object is instance of class"""
|
||||
try:
|
||||
return isinstance(obj, clazz)
|
||||
except TypeError:
|
||||
return False
|
||||
139
kit/StrKit.py
Normal file
139
kit/StrKit.py
Normal file
@@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal StrKit - String Utilities
|
||||
"""
|
||||
|
||||
class StrKit:
|
||||
"""String utility class"""
|
||||
|
||||
@staticmethod
|
||||
def first_char_to_lower_case(s: str) -> str:
|
||||
"""首字母变小写"""
|
||||
if not s:
|
||||
return s
|
||||
first_char = s[0]
|
||||
if 'A' <= first_char <= 'Z':
|
||||
arr = list(s)
|
||||
arr[0] = chr(ord(arr[0]) + ord('a') - ord('A'))
|
||||
return ''.join(arr)
|
||||
return s
|
||||
|
||||
@staticmethod
|
||||
def first_char_to_upper_case(s: str) -> str:
|
||||
"""首字母变大写"""
|
||||
if not s:
|
||||
return s
|
||||
first_char = s[0]
|
||||
if 'a' <= first_char <= 'z':
|
||||
arr = list(s)
|
||||
arr[0] = chr(ord(arr[0]) - (ord('a') - ord('A')))
|
||||
return ''.join(arr)
|
||||
return s
|
||||
|
||||
@staticmethod
|
||||
def is_blank(s: str) -> bool:
|
||||
"""字符串为 null 或者内部字符全部为 ' ' '\\t' '\\n' '\\r' 这四类字符时返回 true"""
|
||||
if s is None:
|
||||
return True
|
||||
for char in s:
|
||||
if char > ' ':
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def not_blank(s: str) -> bool:
|
||||
return not StrKit.is_blank(s)
|
||||
|
||||
@staticmethod
|
||||
def not_blank(*strings) -> bool:
|
||||
if strings is None or len(strings) == 0:
|
||||
return False
|
||||
for s in strings:
|
||||
if StrKit.is_blank(s):
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def has_blank(*strings) -> bool:
|
||||
if strings is None or len(strings) == 0:
|
||||
return True
|
||||
for s in strings:
|
||||
if StrKit.is_blank(s):
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def not_null(*paras) -> bool:
|
||||
if paras is None:
|
||||
return False
|
||||
for obj in paras:
|
||||
if obj is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def default_if_blank(s: str, default_value: str) -> str:
|
||||
return default_value if StrKit.is_blank(s) else s
|
||||
|
||||
@staticmethod
|
||||
def to_camel_case(s: str) -> str:
|
||||
"""将包含下划线字符 '_' 的字符串转换成驼峰格式,不包含下划线则原样返回"""
|
||||
return StrKit._to_camel_case(s, False)
|
||||
|
||||
@staticmethod
|
||||
def _to_camel_case(s: str, to_lower_case_anyway: bool) -> str:
|
||||
length = len(s)
|
||||
if length <= 1:
|
||||
return s
|
||||
|
||||
buf = []
|
||||
index = 0
|
||||
i = 0
|
||||
while i < length:
|
||||
ch = s[i]
|
||||
if ch == '_':
|
||||
i += 1
|
||||
if i < length:
|
||||
ch = s[i]
|
||||
if index == 0:
|
||||
buf.append(chr(ord(ch) + 32) if 'A' <= ch <= 'Z' else ch)
|
||||
else:
|
||||
buf.append(chr(ord(ch) - 32) if 'a' <= ch <= 'z' else ch)
|
||||
index += 1
|
||||
else:
|
||||
buf.append(chr(ord(ch) + 32) if 'A' <= ch <= 'Z' else ch)
|
||||
index += 1
|
||||
i += 1
|
||||
|
||||
if to_lower_case_anyway:
|
||||
return ''.join(buf[:index])
|
||||
|
||||
return s if i == index else ''.join(buf[:index])
|
||||
|
||||
@staticmethod
|
||||
def join(string_array, separator: str = '') -> str:
|
||||
"""Join string array"""
|
||||
if not string_array:
|
||||
return ''
|
||||
if separator:
|
||||
return separator.join(string_array)
|
||||
return ''.join(string_array)
|
||||
|
||||
@staticmethod
|
||||
def slow_equals(a: str, b: str) -> bool:
|
||||
"""Timing-safe string comparison"""
|
||||
from .HashKit import HashKit
|
||||
a_bytes = a.encode('utf-8') if a else None
|
||||
b_bytes = b.encode('utf-8') if b else None
|
||||
return HashKit.slow_equals(a_bytes, b_bytes)
|
||||
|
||||
@staticmethod
|
||||
def equals(a: str, b: str) -> bool:
|
||||
return a is None if b is None else a == b
|
||||
|
||||
@staticmethod
|
||||
def get_random_uuid() -> str:
|
||||
"""Generate random UUID without dashes"""
|
||||
import uuid
|
||||
return uuid.uuid4().hex
|
||||
111
kit/SyncWriteMap.py
Normal file
111
kit/SyncWriteMap.py
Normal file
@@ -0,0 +1,111 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal SyncWriteMap - Thread-Safe Dictionary
|
||||
"""
|
||||
|
||||
from typing import Dict, Generic, TypeVar, Optional, Callable
|
||||
from threading import Lock
|
||||
|
||||
K = TypeVar('K')
|
||||
V = TypeVar('V')
|
||||
|
||||
class SyncWriteDict(Dict[K, V]):
|
||||
"""Thread-safe dictionary with synchronized write operations"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._lock = Lock()
|
||||
|
||||
def put(self, key: K, value: V) -> Optional[V]:
|
||||
"""Put key-value pair with synchronization"""
|
||||
with self._lock:
|
||||
old_value = self.get(key)
|
||||
super().__setitem__(key, value)
|
||||
return old_value
|
||||
|
||||
def put_if_absent(self, key: K, value: V) -> V:
|
||||
"""Put value only if key doesn't exist"""
|
||||
with self._lock:
|
||||
if key not in self:
|
||||
super().__setitem__(key, value)
|
||||
return value
|
||||
return self[key]
|
||||
|
||||
def remove(self, key: K) -> Optional[V]:
|
||||
"""Remove key with synchronization"""
|
||||
with self._lock:
|
||||
return super().pop(key, None)
|
||||
|
||||
def clear(self):
|
||||
"""Clear all items with synchronization"""
|
||||
with self._lock:
|
||||
super().clear()
|
||||
|
||||
def update(self, other: dict = None, **kwargs):
|
||||
"""Update dictionary with synchronization"""
|
||||
with self._lock:
|
||||
if other:
|
||||
super().update(other)
|
||||
if kwargs:
|
||||
super().update(kwargs)
|
||||
|
||||
def compute_if_absent(self, key: K, mapping_function: Callable[[K], V]) -> V:
|
||||
"""Compute value if key is absent"""
|
||||
with self._lock:
|
||||
if key not in self:
|
||||
value = mapping_function(key)
|
||||
super().__setitem__(key, value)
|
||||
return value
|
||||
return self[key]
|
||||
|
||||
def compute_if_present(self, key: K, remapping_function: Callable[[K, V], V]) -> Optional[V]:
|
||||
"""Compute new value if key exists"""
|
||||
with self._lock:
|
||||
if key in self:
|
||||
old_value = self[key]
|
||||
new_value = remapping_function(key, old_value)
|
||||
if new_value is None:
|
||||
super().__delitem__(key)
|
||||
else:
|
||||
super().__setitem__(key, new_value)
|
||||
return new_value
|
||||
return None
|
||||
|
||||
def compute(self, key: K, remapping_function: Callable[[K, Optional[V]], Optional[V]]) -> Optional[V]:
|
||||
"""Compute new value for key"""
|
||||
with self._lock:
|
||||
old_value = self.get(key)
|
||||
new_value = remapping_function(key, old_value)
|
||||
if new_value is None:
|
||||
if key in self:
|
||||
super().__delitem__(key)
|
||||
else:
|
||||
super().__setitem__(key, new_value)
|
||||
return new_value
|
||||
|
||||
def replace(self, key: K, old_value: V, new_value: V) -> bool:
|
||||
"""Replace value if old value matches"""
|
||||
with self._lock:
|
||||
if key in self and self[key] == old_value:
|
||||
super().__setitem__(key, new_value)
|
||||
return True
|
||||
return False
|
||||
|
||||
def replace_value(self, key: K, value: V) -> bool:
|
||||
"""Replace value regardless of old value"""
|
||||
with self._lock:
|
||||
if key in self:
|
||||
super().__setitem__(key, value)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class SyncWriteMap(SyncWriteDict):
|
||||
"""Thread-safe dictionary (alias for compatibility)"""
|
||||
|
||||
def __init__(self, initial_capacity: int = None, load_factor: float = None,
|
||||
mapping: Dict[K, V] = None):
|
||||
super().__init__(mapping or {})
|
||||
# Note: initial_capacity and load_factor are ignored in Python dict implementation
|
||||
# but kept for API compatibility
|
||||
150
kit/TimeKit.py
Normal file
150
kit/TimeKit.py
Normal file
@@ -0,0 +1,150 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal TimeKit - Date and Time Utilities
|
||||
"""
|
||||
|
||||
from datetime import datetime, date, time, timedelta
|
||||
from typing import Union
|
||||
import threading
|
||||
|
||||
class TimeKit:
|
||||
"""Date and time utility class"""
|
||||
|
||||
_formatters = {}
|
||||
_sdf_cache = threading.local()
|
||||
|
||||
@staticmethod
|
||||
def _get_sdf(pattern: str):
|
||||
"""Get thread-local SimpleDateFormat equivalent"""
|
||||
if not hasattr(TimeKit._sdf_cache, 'formatters'):
|
||||
TimeKit._sdf_cache.formatters = {}
|
||||
|
||||
if pattern not in TimeKit._sdf_cache.formatters:
|
||||
TimeKit._sdf_cache.formatters[pattern] = _SimpleDateFormat(pattern)
|
||||
|
||||
return TimeKit._sdf_cache.formatters[pattern]
|
||||
|
||||
@staticmethod
|
||||
def now(pattern: str = "yyyy-MM-dd HH:mm:ss") -> str:
|
||||
"""Get current time as string"""
|
||||
return datetime.now().strftime(TimeKit._to_python_pattern(pattern))
|
||||
|
||||
@staticmethod
|
||||
def now_with_millisecond() -> str:
|
||||
"""Get current time with millisecond precision"""
|
||||
return datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3]
|
||||
|
||||
@staticmethod
|
||||
def format(dt: Union[datetime, date], pattern: str = "yyyy-MM-dd HH:mm:ss") -> str:
|
||||
"""Format datetime or date to string"""
|
||||
python_pattern = TimeKit._to_python_pattern(pattern)
|
||||
|
||||
if isinstance(dt, datetime):
|
||||
return dt.strftime(python_pattern)
|
||||
elif isinstance(dt, date):
|
||||
return dt.strftime(TimeKit._to_python_pattern(pattern))
|
||||
elif isinstance(dt, time):
|
||||
return dt.strftime(TimeKit._to_python_pattern(pattern))
|
||||
else:
|
||||
raise ValueError(f"Unsupported type: {type(dt)}")
|
||||
|
||||
@staticmethod
|
||||
def parse(date_string: str, pattern: str = "yyyy-MM-dd HH:mm:ss") -> datetime:
|
||||
"""Parse string to datetime"""
|
||||
try:
|
||||
python_pattern = TimeKit._to_python_pattern(pattern)
|
||||
return datetime.strptime(date_string, python_pattern)
|
||||
except ValueError as e:
|
||||
raise RuntimeError(f"Error parsing date: {e}")
|
||||
|
||||
@staticmethod
|
||||
def parse_date(date_string: str, pattern: str = "yyyy-MM-dd") -> date:
|
||||
"""Parse string to date"""
|
||||
dt = TimeKit.parse(date_string, pattern)
|
||||
return dt.date()
|
||||
|
||||
@staticmethod
|
||||
def parse_time(time_string: str, pattern: str = "HH:mm:ss") -> time:
|
||||
"""Parse string to time"""
|
||||
dt = TimeKit.parse(f"1970-01-01 {time_string}", f"yyyy-MM-dd {pattern}")
|
||||
return dt.time()
|
||||
|
||||
@staticmethod
|
||||
def to_datetime(dt: Union[datetime, date]) -> datetime:
|
||||
"""Convert date to datetime"""
|
||||
if isinstance(dt, datetime):
|
||||
return dt
|
||||
elif isinstance(dt, date):
|
||||
return datetime.combine(dt, time.min)
|
||||
else:
|
||||
raise ValueError(f"Unsupported type: {type(dt)}")
|
||||
|
||||
@staticmethod
|
||||
def to_date(dt: Union[datetime, date]) -> date:
|
||||
"""Convert datetime to date"""
|
||||
if isinstance(dt, datetime):
|
||||
return dt.date()
|
||||
elif isinstance(dt, date):
|
||||
return dt
|
||||
else:
|
||||
raise ValueError(f"Unsupported type: {type(dt)}")
|
||||
|
||||
@staticmethod
|
||||
def is_after(dt1: datetime, dt2: datetime) -> bool:
|
||||
"""Check if dt1 is after dt2"""
|
||||
return dt1 > dt2
|
||||
|
||||
@staticmethod
|
||||
def is_before(dt1: datetime, dt2: datetime) -> bool:
|
||||
"""Check if dt1 is before dt2"""
|
||||
return dt1 < dt2
|
||||
|
||||
@staticmethod
|
||||
def is_equal(dt1: datetime, dt2: datetime) -> bool:
|
||||
"""Check if dt1 equals dt2"""
|
||||
return dt1 == dt2
|
||||
|
||||
@staticmethod
|
||||
def _to_python_pattern(java_pattern: str) -> str:
|
||||
"""Convert Java date pattern to Python strftime pattern"""
|
||||
mapping = {
|
||||
'yyyy': '%Y',
|
||||
'yy': '%y',
|
||||
'MM': '%m',
|
||||
'dd': '%d',
|
||||
'HH': '%H',
|
||||
'mm': '%M',
|
||||
'ss': '%S',
|
||||
'SSS': '%f',
|
||||
}
|
||||
|
||||
result = java_pattern
|
||||
for java_fmt, python_fmt in mapping.items():
|
||||
result = result.replace(java_fmt, python_fmt)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class _SimpleDateFormat:
|
||||
"""Simple date format implementation (Java SimpleDateFormat equivalent)"""
|
||||
|
||||
def __init__(self, pattern: str):
|
||||
self.pattern = pattern
|
||||
self.python_pattern = TimeKit._to_python_pattern(pattern)
|
||||
|
||||
def format(self, dt: Union[datetime, date]) -> str:
|
||||
"""Format datetime to string"""
|
||||
if isinstance(dt, datetime):
|
||||
return dt.strftime(self.python_pattern)
|
||||
elif isinstance(dt, date):
|
||||
return dt.strftime(self.python_pattern)
|
||||
else:
|
||||
raise ValueError(f"Unsupported type: {type(dt)}")
|
||||
|
||||
def parse(self, date_string: str) -> datetime:
|
||||
"""Parse string to datetime"""
|
||||
try:
|
||||
return datetime.strptime(date_string, self.python_pattern)
|
||||
except ValueError as e:
|
||||
raise RuntimeError(f"Error parsing date: {e}")
|
||||
206
kit/TypeKit.py
Normal file
206
kit/TypeKit.py
Normal file
@@ -0,0 +1,206 @@
|
||||
#!/usr/bin/env python3.9
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JFinal TypeKit - Type Conversion Utilities
|
||||
"""
|
||||
|
||||
from datetime import datetime, date, time, timedelta
|
||||
from decimal import Decimal, InvalidOperation
|
||||
import re
|
||||
|
||||
class TypeKit:
|
||||
"""Type conversion utility class"""
|
||||
|
||||
_date_pattern = "yyyy-MM-dd"
|
||||
_date_len = len(_date_pattern)
|
||||
_date_time_without_second_pattern = "yyyy-MM-dd HH:mm"
|
||||
_date_time_without_second_len = len(_date_time_without_second_pattern)
|
||||
_date_time_pattern = "yyyy-MM-dd HH:mm:ss"
|
||||
|
||||
@staticmethod
|
||||
def to_str(s) -> str:
|
||||
"""Convert to string"""
|
||||
return str(s) if s is not None else None
|
||||
|
||||
@staticmethod
|
||||
def to_int(n) -> int:
|
||||
"""Convert to integer"""
|
||||
if n is None:
|
||||
return None
|
||||
if isinstance(n, int):
|
||||
return n
|
||||
if isinstance(n, float):
|
||||
return int(n)
|
||||
try:
|
||||
return int(str(n))
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def to_long(n) -> int:
|
||||
"""Convert to long integer"""
|
||||
return TypeKit.to_int(n)
|
||||
|
||||
@staticmethod
|
||||
def to_float(n) -> float:
|
||||
"""Convert to float"""
|
||||
if n is None:
|
||||
return None
|
||||
if isinstance(n, (int, float)):
|
||||
return float(n)
|
||||
try:
|
||||
return float(str(n))
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def to_double(n) -> float:
|
||||
"""Convert to double"""
|
||||
return TypeKit.to_float(n)
|
||||
|
||||
@staticmethod
|
||||
def to_decimal(n) -> Decimal:
|
||||
"""Convert to Decimal"""
|
||||
if n is None:
|
||||
return None
|
||||
if isinstance(n, Decimal):
|
||||
return n
|
||||
try:
|
||||
return Decimal(str(n))
|
||||
except (InvalidOperation, ValueError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def to_big_decimal(n) -> Decimal:
|
||||
"""Convert to BigDecimal"""
|
||||
return TypeKit.to_decimal(n)
|
||||
|
||||
@staticmethod
|
||||
def to_short(n) -> int:
|
||||
"""Convert to short integer"""
|
||||
result = TypeKit.to_int(n)
|
||||
return result if result is None else max(-32768, min(32767, result))
|
||||
|
||||
@staticmethod
|
||||
def to_byte(n) -> int:
|
||||
"""Convert to byte integer"""
|
||||
result = TypeKit.to_int(n)
|
||||
return result if result is None else max(-128, min(127, result))
|
||||
|
||||
@staticmethod
|
||||
def to_boolean(b) -> bool:
|
||||
"""Convert to boolean"""
|
||||
if b is None:
|
||||
return None
|
||||
if isinstance(b, bool):
|
||||
return b
|
||||
|
||||
if isinstance(b, (int, float)):
|
||||
if b == 1:
|
||||
return True
|
||||
elif b == 0:
|
||||
return False
|
||||
return bool(b)
|
||||
|
||||
if isinstance(b, str):
|
||||
s = str(b).lower().strip()
|
||||
if s in ("true", "1"):
|
||||
return True
|
||||
elif s in ("false", "0"):
|
||||
return False
|
||||
|
||||
return bool(b)
|
||||
|
||||
@staticmethod
|
||||
def to_number(n) -> float:
|
||||
"""Convert to number"""
|
||||
if n is None:
|
||||
return None
|
||||
if isinstance(n, (int, float)):
|
||||
return float(n)
|
||||
|
||||
s = str(n)
|
||||
if '.' in s:
|
||||
return float(s)
|
||||
else:
|
||||
try:
|
||||
return int(s)
|
||||
except ValueError:
|
||||
return float(s)
|
||||
|
||||
@staticmethod
|
||||
def to_date(d):
|
||||
"""Convert to datetime"""
|
||||
if d is None:
|
||||
return None
|
||||
|
||||
if isinstance(d, (datetime, date)):
|
||||
if isinstance(d, datetime):
|
||||
return d
|
||||
else:
|
||||
return datetime.combine(d, time.min)
|
||||
|
||||
if isinstance(d, str):
|
||||
s = str(d).strip()
|
||||
s_len = len(s)
|
||||
|
||||
if s_len <= TypeKit._date_len:
|
||||
return TypeKit._parse_date(s, TypeKit._date_pattern)
|
||||
elif s_len > TypeKit._date_time_without_second_len:
|
||||
return TypeKit._parse_date(s, TypeKit._date_time_pattern)
|
||||
else:
|
||||
colon_count = s.count(':')
|
||||
if colon_count == 2:
|
||||
return TypeKit._parse_date(s, TypeKit._date_time_pattern)
|
||||
elif colon_count == 1:
|
||||
return TypeKit._parse_date(s, TypeKit._date_time_without_second_pattern)
|
||||
|
||||
raise ValueError(f"Cannot convert to date: {d}")
|
||||
|
||||
@staticmethod
|
||||
def _parse_date(date_string: str, pattern: str) -> datetime:
|
||||
"""Parse date string with pattern"""
|
||||
try:
|
||||
# Simplified date parsing - supports common formats
|
||||
if pattern == "yyyy-MM-dd":
|
||||
parts = date_string.split('-')
|
||||
if len(parts) == 3:
|
||||
return datetime(int(parts[0]), int(parts[1]), int(parts[2]))
|
||||
elif pattern == "yyyy-MM-dd HH:mm:ss":
|
||||
date_part, time_part = date_string.split(' ')
|
||||
date_parts = date_part.split('-')
|
||||
time_parts = time_part.split(':')
|
||||
if len(date_parts) == 3 and len(time_parts) == 3:
|
||||
return datetime(
|
||||
int(date_parts[0]), int(date_parts[1]), int(date_parts[2]),
|
||||
int(time_parts[0]), int(time_parts[1]), int(time_parts[2])
|
||||
)
|
||||
elif pattern == "yyyy-MM-dd HH:mm":
|
||||
date_part, time_part = date_string.split(' ')
|
||||
date_parts = date_part.split('-')
|
||||
time_parts = time_part.split(':')
|
||||
if len(date_parts) == 3 and len(time_parts) == 2:
|
||||
return datetime(
|
||||
int(date_parts[0]), int(date_parts[1]), int(date_parts[2]),
|
||||
int(time_parts[0]), int(time_parts[1])
|
||||
)
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
# Fallback to try parsing with dateutil or regex
|
||||
raise ValueError(f"Cannot parse date: {date_string}")
|
||||
|
||||
@staticmethod
|
||||
def to_local_date_time(ldt):
|
||||
"""Convert to LocalDateTime"""
|
||||
if ldt is None:
|
||||
return None
|
||||
|
||||
if isinstance(ldt, datetime):
|
||||
return ldt
|
||||
|
||||
d = TypeKit.to_date(ldt)
|
||||
if d:
|
||||
return d
|
||||
|
||||
raise ValueError(f"Cannot convert to LocalDateTime: {ldt}")
|
||||
Reference in New Issue
Block a user