当前位置: 移动技术网 > 科技>办公>CPU > 多线程分块处理同一文件,并写入同一文件

多线程分块处理同一文件,并写入同一文件

2020年07月30日  | 移动技术网科技  | 我要评论

其实,在测试中,该方法并没有多大程度上节约时间。可能存在以下原因:

  • 本地服务器中有大任务在运行,导致等待时间延长
  • 大任务的运行占用了所有的cpu,这也使得线程间切换等待时间延长,使并发看起来并不是并发
  • 这种线程间切换的延长的时间,增加了处理时间,反而不如单一线程的时间快,因为近乎不存在切换开销
  • 大家可以再次优化,节约时间。
import os, sys
import time
import re
import threading

# lock - 保护写文件
mu = threading.Lock()
# 继承线程
class Processer(threading.Thread):
   def __init__(self, file_name, start_pos, end_pos):
      super(Processer, self).__init__()
      self.file_name = file_name
      self.start_pos = start_pos
      self.end_pos = end_pos

   def _extract_info(self, line):
      line = line.strip()
      info = line.split("\t")
      rlts = re.split(re.compile(r'[:-]'), info[25])
      seqnames = 'chr' + rlts[0]
      try:
         start = int(rlts[1]) - 1
         end = rlts[2]
         width = int(end) - int(start) + 1
         strand = info[26]
         cosid = info[17]
         cosmic = seqnames + "\t" + str(start) + "\t" + end + "\t" + str(width) + "\t" + strand + "\t" + cosid
         #print(cosmic)
         return cosmic
      except Exception as e:
         print(line)
         return None

   def _write_line(self, line):
      mu.acquire()
      with open("Cosmic_test.txt", "a+") as fout:
         fout.write(line + "\n")
      mu.release()
    # 重载 Thread 中 run() 方法
    # 由 start() 触发 run() 方法,执行线程操作
   def run(self):
      fp = open(self.file_name, 'r')
    # 判断分块后的文件块的首位置是不是行首,
    # 是行首的话,不做处理,
    # 否则,将文件块的首位置定位到下一行的行首
      if self.start_pos != 0:
         fp.seek(self.start_pos-1)
         if fp.read(1) != '\n':
            line = fp.readline()
            self.start_pos = fp.tell()
      fp.seek(self.start_pos)

      # 对该块进行处理
      while self.start_pos <= self.end_pos:
         line = fp.readline()
         cosrlt = self._extract_info(line)
         if cosrlt != None:
            self._write_line(cosrlt)
         self.start_pos = fp.tell()

# 文件分块
class Partition(object):
   def __init__(self, file_name, thread_num):
      self.file_name = file_name
      self.block_num = thread_num
      # 写入数据
      with open("Cosmic_test.txt", "a+") as fout:
         fout.write("seqnames\tstart\tend\twidth\tstrand\tcosid\n")
   
   def part(self):
      fp = open(self.file_name, 'r')
      fp.seek(0, 2)
      pos_lst = list()
      file_size = fp.tell()
      block_size = file_size // self.block_num
      start_pos = 0
      for i in range(self.block_num):
         if i == self.block_num - 1:
            end_pos = file_size - 1
            pos_lst.append((start_pos, end_pos))
            break
         
         end_pos = start_pos + block_size - 1

         if end_pos >= file_size:
            end_pos = file_size - 1

         if start_pos >= file_size:
            break

         pos_lst.append((start_pos, end_pos))

         start_pos = end_pos + 1
      fp.close()
      return pos_lst

if __name__ == '__main__':
   start_time = time.time()
   thread_num = 20
   pt = Partition(opts.cosmic, thread_num)
   thd = list()
   pos = pt.part()
   for i in range(thread_num):
      thd.append(Processer(opts.cosmic, *pos[i]))

   for i in range(thread_num):
      thd[i].start()

   for i in range(thread_num):
         thd[i].join()

   end_time = time.time()
   print("Cost time is {}".format(end_time - start_time))

本文地址:https://blog.csdn.net/nixiang_888/article/details/107636421

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网