警告
本文最后更新于 2025-05-07,文中内容可能已过时。
Docker部署
前台运行(调试模式)
前台运行模式适合开发和调试,退出容器时会被自动删除:
1
2
3
4
5
|
docker run --rm -it --init \
--user pwuser --workdir /home/pwuser \
-p 3000:3000 \
mcr.microsoft.com/playwright:v1.51.0-noble \
/bin/sh -c "npx -y playwright@1.51.0 run-server --port 3000 --host 0.0.0.0"
|
后台运行(生产模式)
后台运行模式适合生产环境,容器会持续运行:
1
2
3
4
5
6
|
docker run -d -it --init \
--user pwuser --workdir /home/pwuser \
--add-host hostmachine:host-gateway \
-p 3000:3000 \
mcr.microsoft.com/playwright:v1.52.0-noble \
/bin/sh -c "npx -y playwright@1.52.0 run-server --port 3000 --host 0.0.0.0"
|
💡 使用提示: 如果 CI/脚本环境不支持 \ 续行符,可以直接删除换行和 \ 符号,合并成单行命令。
关键参数说明
| 参数 |
说明 |
--rm |
容器退出时自动删除,避免资源占用 |
-it |
-i 保持输入流开放,-t 分配伪终端 |
--init |
使用轻量级初始化进程处理信号,防止僵尸进程 |
-p 3000:3000 |
映射主机端口到容器端口 |
--user pwuser |
使用非 root 用户运行,提高安全性 |
--workdir /home/pwuser |
设置容器工作目录 |
命令执行流程
该命令将执行以下操作:
- 启动容器: 基于
mcr.microsoft.com/playwright:v1.52.0-noble 镜像
- 端口映射: 将主机 3000 端口映射到容器 3000 端口
- 用户权限: 使用
pwuser 用户而非 root 运行
- 环境配置: 设置工作目录为
/home/pwuser
- 服务启动: 通过
npx 启动 Playwright 服务器,监听所有网络接口
Python示例
测试用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
import unittest
from playwright.async_api import async_playwright
class TestPlaywright(unittest.IsolatedAsyncioTestCase):
"""
获取的是整个网页的完整 HTML 内容
"""
async def test_playwright(self):
async with async_playwright() as p:
browser = await p.chromium.connect("ws://ip:3000/")
page = await browser.new_page()
await page.goto("https://example.com")
content = await page.content()
print(content)
await browser.close()
"""
提取文本
"""
async def test_playwright_extract_txt(self):
async with async_playwright() as p:
browser = await p.chromium.connect("ws://ip:3000/")
page = await browser.new_page()
await page.goto("https://example.com")
# 只要纯文本,不要 HTML 标签
# 使用 JavaScript 提取纯文本
text_content = await page.evaluate('''() => {
return document.body.innerText || document.documentElement.innerText;
}''')
# 清洗格式(空行、缩进等)
# text_content = await page.evaluate('''() => {
# return document.body.innerText.replace(/\\s+/g, ' ').trim();
# }''')
print(text_content)
await browser.close()
if __name__ == "__main__":
unittest.main()
|
设置请求头
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
async def test_playwright():
async with async_playwright() as p:
# 连接远程浏览器
browser = await p.chromium.connect("ws://ip:3000/")
# 创建带有自定义 headers 的上下文
context = await browser.new_context(extra_http_headers={
"User-Agent": "MyCustomUserAgent/1.0",
"Authorization": "Bearer YOUR_TOKEN_HERE",
"X-Custom-Header": "SomeValue"
})
# 在该上下文中打开新页面
page = await context.new_page()
await page.goto("https://example.com")
# 获取内容或其他操作
content = await page.content()
print(content)
# 关闭 context 和 browser(注意:不要关闭远程浏览器实例)
await context.close()
# await browser.close() # 不建议关闭远程浏览器连接,除非你知道自己在做什么
|
抓取多个网页
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import asyncio
from playwright.async_api import async_playwright
async def test_playwright():
urls = [
"https://example.com",
"https://example.org",
"https://example.net"
]
async with async_playwright() as p:
try:
# 连接远程浏览器
browser = await p.chromium.connect("ws://ip:3000/")
# 创建带有自定义 headers 的上下文
context = await browser.new_context(extra_http_headers={
"User-Agent": "MyCustomUserAgent/1.0",
"Authorization": "Bearer YOUR_TOKEN_HERE",
"X-Custom-Header": "SomeValue"
})
for url in urls:
try:
# 在该上下文中打开新页面
page = await context.new_page()
await page.goto(url)
# 获取内容或其他操作
content = await page.content()
print(f"Content of {url}:")
print(content)
except Exception as e:
print(f"Error while processing {url}: {e}")
finally:
# 关闭当前页面
if 'page' in locals():
await page.close()
except Exception as e:
print(f"An error occurred: {e}")
finally:
# 确保上下文被关闭
if 'context' in locals():
await context.close()
# 运行异步函数
asyncio.run(test_playwright())
|
完整样例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
"""
抓取url网址内容
"""
async def fetch_page(doc:Dict[str,Any], currLevel: int = 1, maxLevel: int = 1, kbId: str = None, white_list: List[str] = None, loadURL:Dict[str,int]={}, expression:List[str] = None, headers: Dict[str, Any] ={}):
url = doc.get("path")
if currLevel > maxLevel:
return
if loadURL.get(url) is not None:
return
loadURL[url] = 1
# 检查是否有文件被上传
# 使用 Playwright 抓取网页
async with async_playwright() as p:
try:
browser = await p.chromium.connect(yamlConfig.get("playwright").get("url"))
# 创建带有自定义 headers 的上下文
context = await browser.new_context(extra_http_headers=headers)
# 在该上下文中打开新页面
page = await context.new_page()
# networkidle:至少500毫秒没有网络连接活动时,Playwright 将认为页面已加载完毕
await page.goto(url, wait_until='networkidle')
raw_html = await page.content() # HTML 内容(主要用于获取页面标题)
links = [] # 页面中的链接
full_text = "" # 纯文本
# 如果有XPath表达式
if len(expression) > 0 :
# 处理每个 XPath 表达式
for xpath in expression:
elements_html = await get_elements_by_xpath(page, xpath)
if not elements_html:
continue
# 每个 XPath 可能匹配多个元素
for html in elements_html:
soup = BeautifulSoup(html, 'lxml')
# 提取链接并加入到全局links列表中
links.extend([a.get('href') for a in soup.find_all('a', href=True) if a.get('href') not in links])
# 提取纯文本并拼接到full_text
full_text += soup.get_text(separator=' ', strip=True) + " "
else:
# 使用 BeautifulSoup 解析 HTML
soup = BeautifulSoup(raw_html, "lxml")
# 提取页面中的所有链接
links = {a.get('href') for a in soup.find_all('a', href=True)}
# 纯文本内容
full_text = await page.evaluate('''() => {
return document.body.innerText.replace(/\\s+/g, ' ').trim();
}''')
# 关闭页面
await page.close()
if len(full_text) == 0:
return ReturnDatas.ErrorResponse(message="The uploaded webpage text is empty!")
# 查找 <title> 标签并获取其内容
start_title = raw_html.find('<title>')
end_title = raw_html.find('</title>')
if start_title != -1 and end_title != -1:
# 提取 <title> 中间的文本内容
title = raw_html[start_title + 7:end_title].strip()
else:
title = "No Title Found"
if len(title) == 0:
return ReturnDatas.ErrorResponse(message="The title is empty!")
except Exception as e:
core_logger.exception(e)
traceback.print_exc()
finally:
if 'context' in locals():
await context.close()
"""
根据 XPath 表达式获取标签
"""
async def get_elements_by_xpath(page, xpath_expr):
# 使用 page.locator() 方法结合 XPath 选择器
locator = page.locator(f'xpath={xpath_expr}')
# 获取所有匹配的元素数量
count = await locator.count()
if count == 0:
return []
results = []
for i in range(count):
# 获取每个元素的 HTML 内容
html = await locator.nth(i).evaluate('e => e.outerHTML')
results.append(html)
return results
|