import base64 import hashlib import hmac import json import urllib.parse from queue import Queue import requests from selenium import webdriver from bs4 import BeautifulSoup import time #driver = webdriver.Chrome() # Chrome浏览器 from selenium.common import exceptions from selenium.webdriver import Keys, ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC class amro(): def __init__(self): try: self.driver = webdriver.Edge() self.actions = ActionChains(self.driver) except: self.driver = webdriver.Chrome() self.actions = ActionChains(self.driver) self.url = r"http://me.sichuanair.com/login.shtml" self.username = "022673" self.password = "yBQG71@8FC" self.activeMode="" self.q = Queue() self.q2 = Queue() def openWeb(self, url): self.driver.get(url) self.q.put("cz") #打开并登录 def openAmro(self): self.driver.get(self.url) WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, 'singleSubmit'))) self.timerSingal() loginbtn = self.driver.find_element('id', 'singleSubmit') time.sleep(0.5) self.actions.click(loginbtn) self.actions.perform() WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.NAME, "j_username"))).send_keys(self.username) time.sleep(0.5) self.driver.find_element('name', "j_password").send_keys(self.password) time.sleep(0.5) self.driver.find_element('name', "j_password").send_keys(Keys.ENTER) #进入工作指令WO,获取工作包 def WO(self): self.timerSingal() self.driver.switch_to.parent_frame() time.sleep(0.5) nav_lables = WebDriverWait(self.driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, 'nav-label'))) for i in nav_lables: print(i.text) title = i.text if title == '航线维修': self.actions.click(i) self.actions.perform() time.sleep(2) break subtitle = WebDriverWait(self.driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, 'J_menuItem'))) # subtitle = driver.find_elements(By.CLASS_NAME,'J_menuItem') for i in subtitle: title = i.text print(title) if title == "工作指令(WO)管理": self.actions.click(i) self.actions.perform() time.sleep(1) break fr = WebDriverWait(self.driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "iframe"))) print(len(fr)) self.driver.switch_to.frame(1) jd = WebDriverWait(self.driver, 30).until( EC.presence_of_element_located((By.CSS_SELECTOR, '.tdr:nth-child(8) .textbox-icon'))) # jd = driver.find_element(By.CSS_SELECTOR, '.tdr:nth-child(8) .textbox-icon') self.actions.click(jd) self.actions.perform() time.sleep(1) tf = self.driver.find_element(By.ID, '_easyui_combobox_i1_1') self.actions.click(tf) self.actions.perform() time.sleep(1) search = self.driver.find_element(By.CSS_SELECTOR, 'td:nth-child(10) > .searchBtn > span') self.actions.click(search) self.actions.perform() WebDriverWait(self.driver, 30).until_not(EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) page = WebDriverWait(self.driver, 30).until( EC.presence_of_element_located((By.CSS_SELECTOR, '#tt > .panel td:nth-child(1) > input'))) # page = driver.find_element(By.CSS_SELECTOR, '#tt > .panel td:nth-child(1) > input') page.clear() page.send_keys('999') page.send_keys(Keys.ENTER) time.sleep(1) WebDriverWait(self.driver, 30).until_not(EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) # input("page end") time.sleep(1) # time.sleep(10) content = self.driver.find_element(By.CSS_SELECTOR, '#tt > .panel .datagrid-view2 > .datagrid-body').get_attribute( "outerHTML") print(content) with open('work{}.txt'.format(str(int(time.time()))), 'wb') as f: f.write(content.encode('utf-8')) def openMaintitle(self, first): self.timerSingal() self.driver.switch_to.parent_frame() time.sleep(0.5) nav_lables = WebDriverWait(self.driver, 10).until( EC.presence_of_all_elements_located((By.CLASS_NAME, 'nav-label'))) for i in nav_lables: print(i.text) title = i.text if title == first: self.actions.click(i) self.actions.perform() time.sleep(2) break def openSubTitle(self, second): self.timerSingal() self.driver.switch_to.parent_frame() subtitle = WebDriverWait(self.driver, 10).until( EC.presence_of_all_elements_located((By.CLASS_NAME, 'J_menuItem'))) # subtitle = driver.find_elements(By.CLASS_NAME,'J_menuItem') for i in subtitle: title = i.text print(title) if title == second: self.actions.click(i) self.actions.perform() time.sleep(1) break def getActiveTitle(self): self.timerSingal() self.driver.switch_to.parent_frame() activeTitle = self.driver.find_element(By.CSS_SELECTOR, ".active > span") print(activeTitle.text) return activeTitle.text def changeToTitle(self, title): self.timerSingal() self.driver.switch_to.parent_frame() activeTitle = self.getActiveTitle() try: self.driver.find_element(By.CSS_SELECTOR, '.J_menuTab:nth-child(2) > span') except: try: self.driver.find_element(By.CSS_SELECTOR, '.J_menuTab:nth-child(3) > span') except: print('未找到 {}'.format(title)) def getWOData(self): self.timerSingal() self.reStartWebDrive() self.isLogining() if self.activeMode=="工作指令(WO)管理": search = self.driver.find_element(By.CSS_SELECTOR, 'td:nth-child(10) > .searchBtn > span') self.actions.click(search) self.actions.perform() WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) else: fr = WebDriverWait(self.driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "iframe"))) print(len(fr)) self.driver.switch_to.frame(1) jd = WebDriverWait(self.driver, 30).until( EC.presence_of_element_located((By.CSS_SELECTOR, '.tdr:nth-child(8) .textbox-icon'))) # jd = driver.find_element(By.CSS_SELECTOR, '.tdr:nth-child(8) .textbox-icon') self.actions.click(jd) self.actions.perform() time.sleep(1) tf = self.driver.find_element(By.ID, '_easyui_combobox_i1_1') self.actions.click(tf) self.actions.perform() time.sleep(1) search = self.driver.find_element(By.CSS_SELECTOR, 'td:nth-child(10) > .searchBtn > span') self.actions.click(search) self.actions.perform() WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) page = WebDriverWait(self.driver, 30).until( EC.presence_of_element_located((By.CSS_SELECTOR, '#tt > .panel td:nth-child(1) > input'))) # page = driver.find_element(By.CSS_SELECTOR, '#tt > .panel td:nth-child(1) > input') page.clear() page.send_keys('999') page.send_keys(Keys.ENTER) time.sleep(1) WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) # input("page end") time.sleep(1) # time.sleep(10) self.activeMode="工作指令(WO)管理" content = self.driver.find_element(By.CSS_SELECTOR, '#tt > .panel .datagrid-view2 > .datagrid-body').get_attribute( "outerHTML") print(content) with open('work{}.txt'.format(str(int(time.time()))), 'wb') as f: f.write(content.encode('utf-8')) soup = BeautifulSoup(content, 'html5lib') data_list = [] try: for idx, tr in enumerate(soup.find_all('tr')): tds = tr.find_all('td') data_list.append({ '状态': tds[3].contents[0].contents[0], '机号': tds[4].contents[0].contents[0], '时间': tds[8].contents[0].contents[0].replace('\n', '').replace(' ', ''), '内容': tds[10].contents[0].contents[0].replace('\n', '').replace(' ', '') }) except IndexError as e: print(e) print(data_list) return data_list def getFlightData(self, *args): self.timerSingal() self.reStartWebDrive('航班动态') self.isLogining('航班动态') self.driver.switch_to.parent_frame() fr = WebDriverWait(self.driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "iframe"))) self.driver.switch_to.frame(1) print(len(fr)) WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) notview = self.driver.find_element(By.ID, 'notView') if notview.is_selected(): print("取消勾选") self.actions.click(notview) self.actions.perform() time.sleep(1) searchbtn = self.driver.find_element(By.CSS_SELECTOR, '.searchBtn > span') self.actions.click(searchbtn) self.actions.perform() time.sleep(1) WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) if args !=(): date = self.driver.find_element(By.CSS_SELECTOR, '.datebox > .textbox-text') date.clear() time.sleep(0.5) date.send_keys(args[0]) time.sleep(0.5) self.actions.click(searchbtn) self.actions.perform() WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) flightDataContent = self.driver.find_element(By.CSS_SELECTOR, '.datagrid-view2 > .datagrid-body').get_attribute("outerHTML") print(flightDataContent) if args!=(): with open('./static/flightdata/flight{}.txt'.format(args[0]),'wb') as f: f.write(flightDataContent.encode('utf-8')) else: with open('./static/flightdata/flight{}.txt'.format(time.strftime("%Y%m%d", time.localtime())),'wb') as f: f.write(flightDataContent.encode('utf-8')) soup = BeautifulSoup(flightDataContent, "html5lib") flight_list=[] for idx, tr in enumerate(soup.find_all('tr')): tds = tr.find_all('td') temp_list = [] for i in tds: try: temp = i.contents[0].contents[0].contents[0] except: try: temp = i.contents[0].contents[0] except: temp = "" temp_list.append(temp) flight_list.append({ "机号": temp_list[0], "机型": temp_list[1], "发动机": temp_list[2], "航班号": temp_list[3], "进港起飞": temp_list[4], "进港到达时间": temp_list[5], "航站": temp_list[6], "离港起飞时间": temp_list[7], "离港到达": temp_list[8], "进港机位": temp_list[9], "离港机位": temp_list[10], "保留": temp_list[11], "任务类型": temp_list[12], "随机": temp_list[13], "维修人员": temp_list[14], "放行人员": temp_list[15], "工作状态": temp_list[16], "工作状态时间": temp_list[17], "重要航班": temp_list[18], "备降/返航": temp_list[19], "延误": temp_list[20], "附加信息": temp_list[21], }) print(flight_list) return flight_list # .datagrid-view2 > .datagrid-body 数据 # .searchBtn > span 查询 # .datagrid-mask-msg 查询中 def searchFlight(self, *args): self.reStartWebDrive('航班动态') self.isLogining('航班动态') self.driver.switch_to.parent_frame() fr = WebDriverWait(self.driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "iframe"))) self.driver.switch_to.frame(1) print(len(fr)) WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) notview = self.driver.find_element(By.ID, 'notView') if notview.is_selected(): print("取消勾选") self.actions.click(notview) self.actions.perform() time.sleep(1) searchbtn = self.driver.find_element(By.CSS_SELECTOR, '.searchBtn > span') self.actions.click(searchbtn) self.actions.perform() time.sleep(1) WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) if args !=(): date = self.driver.find_element(By.CSS_SELECTOR, '.datebox > .textbox-text') date.clear() time.sleep(0.5) date.send_keys(args[0]) time.sleep(0.5) self.actions.click(searchbtn) self.actions.perform() WebDriverWait(self.driver, 30).until_not( EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg'))) flightDataContent = self.driver.find_element(By.CSS_SELECTOR, '.datagrid-view2 > .datagrid-body').get_attribute("outerHTML") #print(flightDataContent) if args != (): with open('./static/flightdata/flight{}.txt'.format(args[0]), 'wb') as f: f.write(flightDataContent.encode('utf-8')) else: with open('./static/flightdata/flight{}.txt'.format(time.strftime("%Y%m%d", time.localtime())), 'wb') as f: f.write(flightDataContent.encode('utf-8')) soup = BeautifulSoup(flightDataContent, "html5lib") flight_list = [] for idx, tr in enumerate(soup.find_all('tr')): tds = tr.find_all('td') temp_list = [] for i in tds: try: temp = i.contents[0].contents[0].contents[0] except: try: temp = i.contents[0].contents[0] except: temp = "" temp_list.append(temp) flight_list.append({ "机号": temp_list[0], "机型": temp_list[1], "发动机": temp_list[2], "航班号": temp_list[3], "进港起飞": temp_list[4], "进港到达时间": temp_list[5], "航站": temp_list[6], "离港起飞时间": temp_list[7], "离港到达": temp_list[8], "进港机位": temp_list[9], "离港机位": temp_list[10], "保留": temp_list[11], "任务类型": temp_list[12], "随机": temp_list[13], "维修人员": temp_list[14], "放行人员": temp_list[15], "工作状态": temp_list[16], "工作状态时间": temp_list[17], "重要航班": temp_list[18], "备降/返航": temp_list[19], "延误": temp_list[20], "附加信息": temp_list[21], }) print(flight_list) return flight_list def isLogining(self, *args): self.timerSingal() try: notLogining = self.driver.find_element(By.CSS_SELECTOR, '.layui-layer-content') print(notLogining.text) btn = self.driver.find_element(By.CSS_SELECTOR, '.layui-layer-btn0') self.actions.click(btn) WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, 'singleSubmit'))) loginbtn = self.driver.find_element('id', 'singleSubmit') time.sleep(0.5) self.actions.click(loginbtn) self.actions.perform() WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.NAME, "j_username"))).send_keys( self.username) time.sleep(0.5) self.driver.find_element('name', "j_password").send_keys(self.password) time.sleep(0.5) self.driver.find_element('name', "j_password").send_keys(Keys.ENTER) # if self.activeMode == "工作指令(WO)管理": # self.openMaintitle("航线维修") # self.openSubTitle("工作指令(WO)管理") # self.activeMode = "" self.openMaintitle("航线维修") if args!=(): self.openSubTitle(args[0]) else: self.openSubTitle("工作指令(WO)管理") except exceptions.NoSuchElementException as e: print("已登录") print(e) def quitWeb(self): self.driver.quit() self.driver = None def webDriveIsAvailable(self): object_existed = False if self.driver is not None: try: self.driver.execute_script('javascript:void(0);') object_existed = True except: # webdriver要求浏览器执行Javascript出现异常 try: self.driver.quit() finally: self.driver = None finally: return object_existed def reStartWebDrive(self, *args): if not self.webDriveIsAvailable(): try: self.driver = webdriver.Edge() self.actions = ActionChains(self.driver) except: self.driver = webdriver.Chrome() self.actions = ActionChains(self.driver) self.openAmro() self.openMaintitle("航线维修") if args!=(): self.openSubTitle(args[0]) else: self.openSubTitle("工作指令(WO)管理") self.activeMode = "" def timerSingal(self): print("发送指令---") if self.webDriveIsAvailable(): print("发送持续指令") self.q.put("cz") else: print("发送重启指令") self.q2.put("cz2") self.reStartWebDrive() def directSendMsg(self, msg, atuser): timestamp = str(round(time.time() * 1000)) secret = 'SEC185f68b4fbce62bb7cfc72ef3d84444e7b9781e602716af36c50bf52c8cee791' secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # print(timestamp) # print(sign) headers = {'Content-Type': 'application/json'} # 定义数据类型 webhook = 'https://oapi.dingtalk.com/robot/send?access_token=56a4674577a0f034fda2ec99cb5e8c31d9e2cb53151cd019691ce62430e9da39×tamp=' + timestamp + "&sign=" + sign atTittle = "" for n in atuser: atTittle += "@{}".format(n) data = { "msgtype": "markdown", "markdown": { "title": "定时通知", "text": "{}{}".format(atTittle, msg) }, "at": { "atMobiles": atuser, "isAtAll": False } } res = requests.post(webhook, data=json.dumps(data), headers=headers) # 发送post请求 print(res.text) def searchJob(self): nmsg = "" data_list = self.getWOData() for i in data_list: nmsg += self.createMsg(i['机号'], i['状态'], i['时间'], i['内容']) self.directSendMsg(nmsg, ['13032201605']) def createMsg(self, bnum, state, times, cont): MSG = """ #### 机号:{} ##### 状态:{} ##### 时间:{} ##### 内容:{} ###### ------------------------ """.format(bnum, state, times, cont) return MSG if __name__ == '__main__': AMRO = amro() # AMRO.openWeb('https://www.baidu.com') # print(AMRO.webDriveIsAvailable()) # AMRO.quitWeb() # print(AMRO.webDriveIsAvailable()) AMRO.openAmro() AMRO.openMaintitle("航线维修") #AMRO.openSubTitle("工作指令(WO)管理") AMRO.openSubTitle("航班动态") AMRO.getFlightData() #AMRO.WO() #time.sleep(5) #AMRO.quitWeb()