kai-api-gateway / copilot_portal.py
KiWA001's picture
Add enhanced stealth and immediate screenshot after click
3db3e6c
"""
Interactive Browser Portal for Copilot
--------------------------------------
Allows admin to interact with the browser directly through /qazmlp
"""
import asyncio
import logging
from typing import Optional
from playwright.async_api import async_playwright, Page, BrowserContext
logger = logging.getLogger("kai_api.copilot_portal")
class CopilotPortal:
"""Manages an interactive browser session for Copilot."""
def __init__(self):
self.browser = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.playwright = None
self.is_initialized = False
self.last_screenshot = "/tmp/copilot_portal.png"
self.message_queue = []
self.last_activity = None
def is_running(self) -> bool:
"""Check if the portal is currently running."""
if not self.is_initialized:
return False
if not self.browser:
return False
try:
# Check if browser is still connected
return self.browser.is_connected()
except:
return False
async def initialize(self):
"""Initialize the browser and navigate to Copilot with enhanced stealth."""
if self.is_initialized:
return
try:
logger.info("🚀 Portal: Launching browser with stealth...")
self.playwright = await async_playwright().start()
# Enhanced browser args for stealth
self.browser = await self.playwright.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-site-isolation-trials",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-web-security",
"--disable-features=BlockInsecurePrivateNetworkRequests",
"--disable-features=InterestCohort",
"--window-size=1280,800",
"--start-maximized",
"--force-color-profile=srgb",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
],
)
# More realistic browser context
self.context = await self.browser.new_context(
viewport={"width": 1280, "height": 800},
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
locale="en-US",
timezone_id="America/New_York",
geolocation={"latitude": 40.7128, "longitude": -74.0060}, # NYC
permissions=["geolocation"],
color_scheme="light",
reduced_motion="no-preference",
)
# Enhanced stealth script
await self.context.add_init_script("""
// Override navigator properties
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
// Override permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
// Add Chrome runtime
window.chrome = {
runtime: {
OnInstalledReason: {
CHROME_UPDATE: "chrome_update",
INSTALL: "install",
SHARED_MODULE_UPDATE: "shared_module_update",
UPDATE: "update"
},
OnRestartRequiredReason: {
APP_UPDATE: "app_update",
OS_UPDATE: "os_update",
PERIODIC: "periodic"
},
PlatformArch: {
ARM: "arm",
ARM64: "arm64",
MIPS: "mips",
MIPS64: "mips64",
X86_32: "x86-32",
X86_64: "x86-64"
},
PlatformNaclArch: {
ARM: "arm",
MIPS: "mips",
MIPS64: "mips64",
MIPS64_EL: "mips64el",
ARM64: "arm64",
X86_32: "x86-32",
X86_64: "x86-64"
},
PlatformOs: {
ANDROID: "android",
CROS: "cros",
LINUX: "linux",
MAC: "mac",
OPENBSD: "openbsd",
WIN: "win"
},
RequestUpdateCheckStatus: {
NO_UPDATE: "no_update",
THROTTLED: "throttled",
UPDATE_AVAILABLE: "update_available"
}
}
};
// Override WebGL
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
if (parameter === 37445) {
return 'Intel Inc.';
}
if (parameter === 37446) {
return 'Intel Iris OpenGL Engine';
}
return getParameter(parameter);
};
""")
self.page = await self.context.new_page()
# Navigate to Copilot
logger.info("Portal: Navigating to Copilot...")
await self.page.goto("https://copilot.microsoft.com/", timeout=60000)
await asyncio.sleep(5)
self.is_initialized = True
logger.info("✅ Portal: Browser ready!")
# Take initial screenshot
await self.take_screenshot()
except Exception as e:
logger.error(f"Failed to initialize portal: {e}")
raise
async def take_screenshot(self) -> str:
"""Take a screenshot of the current page state."""
if not self.page:
return ""
try:
await self.page.screenshot(path=self.last_screenshot, full_page=False)
return self.last_screenshot
except Exception as e:
logger.error(f"Screenshot failed: {e}")
return ""
async def get_page_content(self) -> str:
"""Get current page HTML content for debugging."""
if not self.page:
return ""
try:
return await self.page.content()
except:
return ""
async def send_message(self, message: str) -> str:
"""Send a message through the portal."""
if not self.page:
return "Error: Browser not initialized"
try:
# Find input field
input_selectors = [
'textarea',
'div[contenteditable="true"]',
'[data-testid="chat-input"]',
'[role="textbox"]',
]
input_found = False
for selector in input_selectors:
try:
el = await self.page.wait_for_selector(selector, timeout=5000)
if el:
# Click and type
await el.click()
await self.page.keyboard.type(message, delay=10)
await asyncio.sleep(0.5)
await self.page.keyboard.press("Enter")
input_found = True
logger.info(f"Portal: Message sent via {selector}")
break
except:
continue
if not input_found:
return "Error: Could not find input field"
# Wait for response
await asyncio.sleep(3)
# Try to extract response
response_text = await self._extract_response()
# Take screenshot after response
await self.take_screenshot()
return response_text or "No response yet, check screenshot"
except Exception as e:
logger.error(f"Portal send error: {e}")
await self.take_screenshot()
return f"Error: {str(e)}"
async def _extract_response(self) -> str:
"""Try to extract the latest response."""
try:
return await self.page.evaluate("""
() => {
const selectors = [
'[data-message-author-role="assistant"]',
'.message-content',
'.ac-textBlock',
'[class*="response"]',
'article',
];
for (const sel of selectors) {
const els = document.querySelectorAll(sel);
if (els.length > 0) {
const last = els[els.length - 1];
const text = last.innerText || last.textContent || '';
if (text.trim().length > 10) return text.trim();
}
}
return '';
}
""")
except:
return ""
async def click_new_chat(self):
"""Click the New Chat button."""
if not self.page:
return
try:
btn = await self.page.query_selector('button:has-text("New chat")')
if btn:
await btn.click()
await asyncio.sleep(2)
await self.take_screenshot()
except Exception as e:
logger.error(f"Click new chat error: {e}")
async def refresh_page(self):
"""Refresh the page."""
if not self.page:
return
try:
await self.page.reload()
await asyncio.sleep(5)
await self.take_screenshot()
except Exception as e:
logger.error(f"Refresh error: {e}")
async def click_at_coordinates(self, x: float, y: float):
"""Click at specific coordinates on the page and immediately take screenshot."""
if not self.page:
logger.error("Portal: No page available for click")
return
try:
logger.info(f"Portal: Clicking at coordinates ({x}, {y})")
# First try clicking on main page
await self.page.mouse.click(x, y)
# Wait a short moment for the page to react
await asyncio.sleep(0.5)
# Check if there's an iframe at that location (CAPTCHA is often in iframe)
iframe_clicked = await self._try_click_iframe(x, y)
if iframe_clicked:
logger.info("Portal: Clicked inside iframe")
# Wait a bit more for any CAPTCHA processing
await asyncio.sleep(1)
# Take screenshot immediately
await self.take_screenshot()
logger.info("Portal: Click completed, screenshot taken")
except Exception as e:
logger.error(f"Portal click error: {e}")
# Still try to take screenshot on error
try:
await self.take_screenshot()
except:
pass
async def _try_click_iframe(self, x: float, y: float) -> bool:
"""Try to click inside iframes at the given coordinates."""
try:
# Get all iframes
iframes = await self.page.query_selector_all('iframe')
for iframe in iframes:
try:
# Check if iframe is visible and contains the coordinates
box = await iframe.bounding_box()
if box and box['x'] <= x <= box['x'] + box['width'] and box['y'] <= y <= box['y'] + box['height']:
# Click inside the iframe
frame = await iframe.content_frame()
if frame:
# Calculate relative coordinates
rel_x = x - box['x']
rel_y = y - box['y']
await frame.mouse.click(rel_x, rel_y)
return True
except:
continue
return False
except Exception as e:
logger.error(f"Iframe click error: {e}")
return False
async def close(self):
"""Close the browser."""
try:
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
self.is_initialized = False
logger.info("Portal: Browser closed")
except Exception as e:
logger.error(f"Close error: {e}")
# Global portal instance
_portal: Optional[CopilotPortal] = None
def get_portal() -> CopilotPortal:
"""Get or create the global portal instance."""
global _portal
if _portal is None:
_portal = CopilotPortal()
return _portal