当前位置:  开发笔记 > 编程语言 > 正文

如何查看文件以进行更改?

如何解决《如何查看文件以进行更改?》经验,为你挑选了13个好方法。

我有一个日志文件由另一个进程编写,我想要观察更改.每次发生变化时,我都想读取新数据,对其进行一些处理.

最好的方法是什么?我希望PyWin32库中有一些钩子.我找到了这个win32file.FindNextChangeNotification功能,但不知道如何让它看到一个特定的文件.

如果有人做过这样的事情,我会非常感激听到......

[编辑]我应该提到我是在一个不需要轮询的解决方案之后.

[编辑]诅咒!这似乎不适用于映射的网络驱动器.我猜Windows不会像在本地磁盘上那样"听到"文件的任何更新.



1> simao..:

你尝试过使用Watchdog吗?

用于监视文件系统事件的Python API库和shell实用程序.

目录监控变得简单

跨平台API.

一个shell工具,用于运行命令以响应目录更改.

通过Quickstart中的一个简单示例快速入门 ...


可以用`easy_install`安装?校验.免费许可证?[检查](https://github.com/gorakhargosh/watchdog/blob/master/LICENSE).解决大平台上的问题?[检查](http://packages.python.org/watchdog/installation.html#supported-platforms-and-caveats).我赞同这个答案.只需注意:[项目页面上的示例](http://packages.python.org/watchdog/quickstart.html#a-simple-example)无法开箱即用.使用[他们的github上的那个](https://github.com/gorakhargosh/watchdog#example-api-usage)代替.
我们使用看门狗.我们可能会切换到QFileSystemWatcher.只是一个公平的警告 - 看门狗是好的,但在所有平台上都远非完美(此时).每个操作系统都有它的特性.所以,除非你致力于让它变得完美,否则你会把你的头发拉出来.如果您只想观看10个左右的文件,我会进行民意调查.操作系统磁盘缓存非常成熟,Watchdog无论如何都涉及轮询API.它主要用于观看巨大的文件夹结构恕我直言.
我对看门狗的抱怨是,它有很多依赖性.当然,比PyQt要少,但它不起作用,感觉就像最小的,最好的做法,做一个就业和做对的解决方案.

2> Deestan..:

如果轮询对您来说足够好,我只会观察"修改时间"文件统计信息是否发生变化.阅读它:

os.stat(filename).st_mtime

(另请注意,Windows本机更改事件解决方案在所有情况下都不起作用,例如在网络驱动器上.)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...



3> Horst Gutman..:

您是否已查看http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html上提供的文档?如果您只需要它在Windows下工作,第二个示例似乎正是您想要的(如果您将目录的路径与您要观看的文件之一进行交换).

否则,轮询可能是唯一真正独立于平台的选项.

注意:我没有尝试过任何这些解决方案.


这个答案是特定于Windows的,但似乎这个问题的一些跨平台解决方案也已发布在这里.

4> 小智..:

如果您需要多平台解决方案,请检查QFileSystemWatcher.这是一个示例代码(未经过消毒):

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)


我认为这很可能是最好的答案,因为他们要么a)依赖Win32的FileSystemwatcher对象而不能移植或b)轮询文件(这对性能有害并且不会扩展).很遗憾Python没有内置这个工具,因为PyQt是一个巨大的依赖,如果您使用的是QFileSystemWatcher类.
我喜欢这个解决方案.我想指出你需要一个QApplication实例才能工作,我在导入下面添加了"app = QtGui.QApplication(sys.argv)",然后在信号连接后添加了"app.exec_()".

5> Maxime..:

它应该不适用于Windows(可能使用cygwin?),但对于unix用户,您应该使用"fcntl"系统调用.这是Python中的一个例子.如果你需要用C语言写它(相同的函数名),它的代码大致相同

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)


顺便说一句,在OS X上不起作用.
在ext4文件系统(在Ubuntu 10.04上)上使用Linux内核2.6.31的魅力,虽然仅用于目录 - 如果我将它与文件一起使用,它会引发IOError"而不是目录".

6> 小智..:

查看pyinotify.

inotify在较新的linux中替换了dnotify(来自早期的答案),并允许文件级而不是目录级监视.


不要对这个答案施加阻碍,但在阅读完这篇文章后,我会说它可能不像想象的那样富有吸引力.http://www.serpentine.com/blog/2008/01/04/why-you-should-not-use-pyinotify/

7> Jon Cage..:

在对Tim Golden的剧本进行一些黑客攻击之后,我有以下内容似乎很有效:

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

这可能与负载更多的错误检查有关,但是为了简单地观察日志文件并在将其吐出到屏幕之前对其进行一些处理,这很有效.

感谢大家的意见 - 很棒的东西!



8> 4Oh4..:

为了观看具有轮询和最小依赖性的单个文件,下面是一个基于Deestan(上图)的答案的充实示例:

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going


这对观看多文件也有用吗?

9> redestructa..:

对我来说最简单的解决方案是使用看门狗的工具watchmedo

从https://pypi.python.org/pypi/watchdog我现在有一个进程查找目录中的sql文件并在必要时执行它们.

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.



10> Bruno De Fra..:

检查我对类似问题的回答.你可以在Python中尝试相同的循环.本页建议:

import time

while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        print line, # already has newline

另请参阅问题tail()使用Python的文件.



11> 小智..:

好吧,因为您使用的是Python,所以您只需打开一个文件并继续从中读取行.

f = open('file.log')

如果读取的行不为空,则处理它.

line = f.readline()
if line:
    // Do what you want with the line

你可能会想到可以继续打电话readline给EOF.在这种情况下,它将继续返回一个空字符串.当某些内容附加到日志文件后,读取将根据需要从停止的位置继续.

如果您正在寻找使用事件或特定库的解决方案,请在您的问题中指明.否则,我认为这个解决方案很好.



12> 小智..:

以下是Kender代码的简化版本,似乎可以执行相同的操作并且不会导入整个文件:

# Check file for new data.

import time

f = open(r'c:\temp\test.txt', 'r')

while True:

    line = f.readline()
    if not line:
        time.sleep(1)
        print 'Nothing New'
    else:
        print 'Call Function: ', line



13> ronedg..:

这是Tim Goldan脚本的另一种修改,该脚本可在unix类型上运行,并通过使用dict(file => time)添加了一个用于文件修改的简单监视程序。

用法:whateverName.py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print('Watching {}..'.format(path_to_watch))

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print('Added: {}'.format(', '.join(added)))
        if removed: print('Removed: {}'.format(', '.join(removed)))
        if modified: print('Modified: {}'.format(', '.join(modified)))

        before = after

推荐阅读
N个小灰流_701
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有