Merge pandas DataFrames from multiple construction sources. Handle different schemas, keys, and data quality issues.
数据来源:ClawHub。 在 ClawSkills 查看
选择你使用的 Agent
方法一:命令行安装(推荐)
推荐(无需提前安装 clawhub)
npx clawhub@latest --dir ~/.claude/skills install df-merger或使用 clawhub CLI(需提前安装)
clawhub --dir ~/.claude/skills install df-merger⚠️ 需要 Node.js 18+,没有 Node?请使用下方方法二直接下载 ZIP。 安装 Node.js →
方法二:手动下载安装(无需 Node)
下载 ZIP,解压后将文件夹放到以下路径,重启 Agent 即可:
安装路径
~/.claude/skills/df-merger/💡解压后将文件夹放到上方路径,重启 Agent 即可生效
--- name: "df-merger" description: "Merge pandas DataFrames from multiple construction sources. Handle different schemas, keys, and data quality issues." homepage: "https://datadrivenconstruction.io" metadata: {"openclaw": {"emoji": "🐼", "os": ["darwin", "linux", "win32"], "homepage": "https://datadrivenconstruction.io", "requires": {"bins": ["python3"]}}} ---
Construction projects combine data from BIM, schedules, costs, and sensors. This skill merges DataFrames from disparate sources with intelligent key matching and schema reconciliation.
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
from difflib import SequenceMatcher
class MergeStrategy(Enum):
"""DataFrame merge strategies."""
INNER = "inner" # Only matching rows
LEFT = "left" # All left, matching right
RIGHT = "right" # Matching left, all right
OUTER = "outer" # All rows from both
CROSS = "cross" # Cartesian product
@dataclass
class MergeResult:
"""Result of merge operation."""
merged_df: pd.DataFrame
matched_rows: int
left_only: int
right_only: int
merge_quality: float # 0-1 score
class ConstructionDFMerger:
"""Merge DataFrames from construction sources."""
# Common construction column name mappings
COLUMN_MAPPINGS = {
'element_id': ['elementid', 'elem_id', 'id', 'guid', 'globalid'],
'type_name': ['typename', 'type', 'element_type', 'category'],
'level': ['level', 'floor', 'storey', 'building_storey'],
'material': ['material', 'mat', 'material_name'],
'volume': ['volume', 'vol', 'volume_m3', 'qty_volume'],
'area': ['area', 'surface_area', 'qty_area', 'area_m2'],
'cost': ['cost', 'price', 'total_cost', 'amount'],
'task_id': ['task_id', 'activity_id', 'wbs', 'activity'],
'start_date': ['start', 'start_date', 'planned_start', 'begin'],
'end_date': ['end', 'end_date', 'planned_finish', 'finish']
}
def __init__(self):
self.column_cache: Dict[str, str] = {}
def find_common_key(self, df1: pd.DataFrame,
df2: pd.DataFrame) -> Optional[str]:
"""Find common key column between DataFrames."""
# Check exact matches first
common = set(df1.columns) & set(df2.columns)
if common:
# Prefer ID-like columns
for col in common:
if 'id' in col.lower() or 'code' in col.lower():
return col
return list(common)[0]
# Try semantic matching
for col1 in df1.columns:
for col2 in df2.columns:
if self._columns_match(col1, col2):
return col1
return None
def _columns_match(self, col1: str, col2: str) -> bool:
"""Check if column names are semantically similar."""
col1_lower = col1.lower().replace('_', '').replace('-', '')
col2_lower = col2.lower().replace('_', '').replace('-', '')
# Exact match after normalization
if col1_lower == col2_lower:
return True
# Check against mappings
for standard, variants in self.COLUMN_MAPPINGS.items():
if col1_lower in variants and col2_lower in variants:
return True
# Similarity check
similarity = SequenceMatcher(None, col1_lower, col2_lower).ratio()
return similarity > 0.8
def harmonize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Standardize column names."""
df = df.copy()
rename_map = {}
for col in df.columns:
col_lower = col.lower().replace('_', '').replace('-', '')
for standard, variants in self.COLUMN_MAPPINGS.items():
if col_lower in variants:
rename_map[col] = standard
break
return df.rename(columns=rename_map)
def merge(self, left: pd.DataFrame,
right: pd.DataFrame,
on: Optional[str] = None,
left_on: Optional[str] = None,
right_on: Optional[str] = None,
how: MergeStrategy = MergeStrategy.LEFT,
harmonize: bool = True) -> MergeResult:
"""Merge two DataFrames."""
if harmonize:
left = self.harmonize_columns(left)
right = self.harmonize_columns(right)
# Determine merge keys
if on is None and left_on is None and right_on is None:
common_key = self.find_common_key(left, right)
if common_key is None:
raise ValueError("No common key found. Specify merge key manually.")
on = common_key
# Perform merge
merged = pd.merge(
left, right,
on=on,
left_on=left_on,
right_on=right_on,
how=how.value,
indicator=True,
suffixes=('_left', '_right')
)
# Calculate statistics
matched = len(merged[merged['_merge'] == 'both'])
left_only = len(merged[merged['_merge'] == 'left_only'])
right_only = len(merged[merged['_merge'] == 'right_only'])
# Quality score
total = len(left) + len(right)
quality = (matched * 2) / total if total > 0 else 0
# Clean up
merged = merged.drop('_merge', axis=1)
return MergeResult(
merged_df=merged,
matched_rows=matched,
left_only=left_only,
right_only=right_only,
merge_quality=round(quality, 2)
)
def merge_multiple(self, dfs: List[pd.DataFrame],
on: Optional[str] = None,
how: MergeStrategy = MergeStrategy.OUTER) -> pd.DataFrame:
"""Merge multiple DataFrames sequentially."""
if not dfs:
return pd.DataFrame()
result = dfs[0].copy()
for i, df in enumerate(dfs[1:], 1):
result_obj = self.merge(result, df, on=on, how=how)
result = result_obj.merged_df
return result
def fuzzy_merge(self, left: pd.DataFrame,
right: pd.DataFrame,
left_on: str,
right_on: str,
threshold: float = 0.8) -> pd.DataFrame:
"""Merge using fuzzy string matching."""
matches = []
left_values = left[left_on].dropna().unique()
right_values = right[right_on].dropna().unique()
for lval in left_values:
best_match = None
best_score = 0
for rval in right_values:
score = SequenceMatcher(None, str(lval).lower(),
str(rval).lower()).ratio()
if score > best_score and score >= threshold:
best_score = score
best_match = rval
if best_match:
matches.append({
'left_key': lval,
'right_key': best_match,
'match_score': best_score
})
match_df = pd.DataFrame(matches)
# Join using match mapping
left_with_key = left.merge(match_df, left_on=left_on, right_on='left_key', how='left')
result = left_with_key.merge(right, left_on='right_key', right_on=right_on, how='left')
return result
class BIMScheduleMerger(ConstructionDFMerger):
"""Specialized merger for BIM and schedule data."""
def merge_bim_schedule(self, bim_df: pd.DataFrame,
schedule_df: pd.DataFrame,
bim_type_col: str = 'Type Name',
schedule_wbs_col: str = 'WBS') -> pd.DataFrame:
"""Merge BIM elements with schedule activities."""
# This typically requires a mapping table
# For now, use fuzzy matching on descriptions
bim_df = self.harmonize_columns(bim_df)
schedule_df = self.harmonize_columns(schedule_df)
# Try to match type names to WBS descriptions
result = self.fuzzy_merge(
bim_df, schedule_df,
left_on=bim_type_col,
right_on=schedule_wbs_col,
threshold=0.6
)
return result
class CostQTOMerger(ConstructionDFMerger):
"""Merge cost data with quantity takeoffs."""
def merge_cost_qto(self, cost_df: pd.DataFrame,
qto_df: pd.DataFrame) -> pd.DataFrame:
"""Merge cost rates with QTO quantities."""
cost_df = self.harmonize_columns(cost_df)
qto_df = self.harmonize_columns(qto_df)
# Try common merge keys
for key in ['work_item_code', 'type_name', 'material', 'element_id']:
if key in cost_df.columns and key in qto_df.columns:
result = self.merge(cost_df, qto_df, on=key)
# Calculate extended costs
result.merged_df['extended_cost'] = (
result.merged_df.get('quantity', 0) *
result.merged_df.get('unit_price', 0)
)
return result.merged_df
# Fallback to fuzzy merge
return self.fuzzy_merge(
qto_df, cost_df,
left_on='type_name' if 'type_name' in qto_df.columns else qto_df.columns[0],
right_on='description' if 'description' in cost_df.columns else cost_df.columns[0]
)
merger = ConstructionDFMerger()
# Merge two DataFrames
result = merger.merge(bim_df, schedule_df)
print(f"Matched: {result.matched_rows}, Quality: {result.merge_quality}")
# Access merged data
merged = result.merged_df
bim_schedule = BIMScheduleMerger()
integrated = bim_schedule.merge_bim_schedule(bim_elements, schedule_activities)
cost_merger = CostQTOMerger()
priced_qto = cost_merger.merge_cost_qto(cost_database, quantities)
print(f"Total: ${priced_qto['extended_cost'].sum():,.2f}")
all_data = merger.merge_multiple(
[bim_df, schedule_df, cost_df, resource_df],
on='element_id'
)
安装 Df Merger 后,可以对 AI 说这些话来触发它
Help me get started with Df Merger
Explains what Df Merger does, walks through the setup, and runs a quick demo based on your current project
Use Df Merger to merge pandas DataFrames from multiple construction sources
Invokes Df Merger with the right parameters and returns the result directly in the conversation
What can I do with Df Merger in my data & analytics workflow?
Lists the top use cases for Df Merger, with example commands for each scenario
将技能文件夹放到 ~/.claude/skills/df-merger/ 目录(个人级,所有项目可用),或 .claude/skills/df-merger/(项目级)。重启 AI 客户端后,用 /df-merger 主动调用,或让 AI 根据上下文自动发现并使用。
Df Merger 支持 Claude、Cursor、OpenClaw,可与这些 AI 平台无缝集成,扩展其能力。
Df Merger 可免费安装使用。请查阅仓库了解许可证信息。
Merge pandas DataFrames from multiple construction sources. Handle different schemas, keys, and data quality issues.
Df Merger 属于「Data & Analytics」分类,该分类的技能帮助 AI 智能体在此领域执行专业任务。
Automate my data & analytics tasks using Df Merger
Identifies repetitive steps in your workflow and sets up Df Merger to handle them automatically